// Copyright Epic Games, Inc. All Rights Reserved. #include "hydration.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { using namespace std::literals; namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime { std::tm Tm{}; int Ms = 0; // sub-second milliseconds [0, 999] static UtcTime Now() { std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); int SubSecMs = static_cast((std::chrono::duration_cast(TimePoint.time_since_epoch()) % 1000).count()); UtcTime Result; Result.Ms = SubSecMs; #if ZEN_PLATFORM_WINDOWS gmtime_s(&Result.Tm, &TimeT); #else gmtime_r(&TimeT, &Result.Tm); #endif return Result; } }; std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) { auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); std::filesystem::path RelativePath; for (auto I = ItAbs; I != Abs.end(); I++) { RelativePath = RelativePath / *I; } return RelativePath; } void CleanDirectory(WorkerThreadPool& WorkerPool, std::atomic& AbortFlag, std::atomic& PauseFlag, const std::filesystem::path& Path) { CleanDirectoryResult Result = CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector{}, {}, 0); for (const auto& [FailedPath, Ec] : Result.FailedRemovePaths) { ZEN_WARN("Failed to remove '{}' while cleaning '{}': {}", FailedPath, Path, Ec.message()); } } // Returns true if RelKey matches any wildcard in Excludes. Excluded paths are // dropped at the dehydrate-side directory scan and never enter the manifest. bool IsExcluded(std::string_view RelKey, std::span Excludes) { for (const std::string& W : Excludes) { if (MatchWildcard(W, RelKey, /*CaseSensitive*/ true)) { return true; } } return false; } std::vector ParseStringArray(CbFieldView Field) { std::vector Out; for (CbFieldView Entry : Field.AsArrayView()) { Out.emplace_back(Entry.AsString()); } return Out; } // Built-in exclude wildcards applied unless the hub config supplies an explicit // `excludes` array (an empty array opts out of all defaults). Patterns match the // dehydrate-side relative path (forward-slash form). `*` is path-separator-agnostic // per zenutil/wildcard.h. std::vector DefaultExcludes() { return { ".sentry-native/*", // sentry-native crash uploader DB; locked while child runs "state_marker", // root-level liveness marker (zenstorageserver.cpp) ".lock", // FILE_FLAG_DELETE_ON_CLOSE lock; locked while child runs "*.bak", // transient backups produced by atomic file replace "gc/reserve.gc", // GC disk reserve (gc subdir per zenstorageserver.cpp) "auth/*", // encrypted auth state under auth/ }; } /////////////////////////////////////////////////////////////////////// // Hydration / dehydration statistics. Atomics so they are safe to update // from parallel worker lambdas. Summary is emitted once after the operation // completes (success or failure). struct PhaseStats { std::atomic Files{0}; // host-side: count of work scheduled in this phase std::atomic Bytes{0}; // lambda-side: bytes transferred on successful completion std::atomic ElapsedUs{0}; // wall time around Work.Wait() // Per-request timing gathered inside the work lambdas. RequestCount + RequestTotalUs are // summed across all completed requests. RequestMaxUs is the slowest single request // observed (CAS-updated in EndRequest); avg vs max gap surfaces tail latency without // keeping per-request samples. std::atomic RequestCount{0}; std::atomic RequestTotalUs{0}; std::atomic RequestMaxUs{0}; std::atomic InFlight{0}; std::atomic InFlightPeak{0}; // Scheduling latency. PhaseClock starts when the phase begins. FirstScheduleUs / // FirstStartUs are the relative times of the earliest ScheduleWork call and the earliest // worker lambda entry. Their difference is how long requests sat in the pool backlog // before a worker picked one up (pool warm-up / backlog). Stopwatch PhaseClock; std::atomic FirstScheduleUs{UINT64_MAX}; std::atomic FirstStartUs{UINT64_MAX}; void RecordScheduled() { uint64_t Now = PhaseClock.GetElapsedTimeUs(); uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed); while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) { } } // Returns a Stopwatch the caller runs across the actual request; call EndRequest with // the elapsed microseconds when the request completes. Stopwatch BeginRequest() { uint64_t Now = PhaseClock.GetElapsedTimeUs(); uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed); while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) { } uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1; uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed); while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed)) { } return Stopwatch{}; } void EndRequest(uint64_t ElapsedUsValue) { InFlight.fetch_sub(1, std::memory_order_relaxed); RequestCount.fetch_add(1, std::memory_order_relaxed); RequestTotalUs.fetch_add(ElapsedUsValue, std::memory_order_relaxed); uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed); while (ElapsedUsValue > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUsValue, std::memory_order_relaxed)) { } } }; struct DehydrateStatistics { PhaseStats Hash; PhaseStats Upload; // Loose CAS PUTs PhaseStats Touch; // Mod-time refresh on pre-existing loose CAS entries; shares Upload's ParallelWork PhaseStats PackUpload; // Pack-blob PUTs; shares Upload's ParallelWork PhaseStats PackTouch; // Mod-time refresh on pre-existing pack blobs; shares Upload's ParallelWork std::atomic LoadStateUs{0}; std::atomic DirScanUs{0}; std::atomic ListExistingUs{0}; std::atomic SaveMetadataUs{0}; std::atomic CleanUs{0}; std::atomic TotalFiles{0}; std::atomic TotalBytes{0}; std::atomic TotalUs{0}; // Pack phase stats std::atomic PackCount{0}; // number of packs built std::atomic PackedFiles{0}; // Files[] entries folded into packs (includes hash-duplicates) std::atomic PackBytes{0}; // total bytes across all packs std::atomic PackBuildUs{0}; // hash + write cost across all packs }; struct HydrateStatistics { PhaseStats Download; // Loose CAS GETs PhaseStats PackDownload; // Pack-blob GETs; shares Download's ParallelWork std::atomic LoadMetadataUs{0}; std::atomic CreateDirsUs{0}; std::atomic CreateDirsCount{0}; // unique parent dirs passed to CreateDirectories std::atomic CleanUs{0}; std::atomic FinalizeUs{0}; std::atomic BuildStateUs{0}; std::atomic TotalFiles{0}; std::atomic TotalBytes{0}; std::atomic TotalUs{0}; // Pack phase stats std::atomic PackCount{0}; // packs in manifest std::atomic PackedFiles{0}; // files unpacked into ServerStateDir (per-destination count) std::atomic PackUnpackUs{0}; // slice + parallel SafeWriteFile cost // Bytes written to disk during the unpack-and-slice phase (sum of slice sizes // touched by SafeWriteFile). Pairs with PackUnpackUs for disk-write throughput. std::atomic UnpackWriteBytes{0}; }; // Bits-per-second rate computed at microsecond precision. Zero-safe. inline uint64_t BitsPerSecond(uint64_t Bytes, uint64_t ElapsedUs) { if (ElapsedUs == 0) { return 0; } return Bytes * 8 * 1'000'000ull / ElapsedUs; } /////////////////////////////////////////////////////////////////////// // Per-module storage interface driven by IncrementalHydrator. class StorageBase { public: virtual ~StorageBase() = default; virtual std::string Describe() const = 0; virtual void SaveMetadata(const CbObject& Data) = 0; virtual CbObject LoadMetadata() = 0; // Backend-specific settings that need to be persisted in state.cbo and reapplied // on hydrate. Today only S3 uses this (MultipartChunkSize - the chunk size used at // dehydrate must be carried forward so hydrate uses the same partitioning). File // backend has no such settings and returns / accepts an empty object. virtual CbObject GetSettings() = 0; virtual void ParseSettings(const CbObjectView& Settings) = 0; virtual std::vector List() = 0; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) = 0; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) = 0; virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) = 0; virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; }; class FileStorage : public StorageBase { public: static constexpr std::string_view Prefix = "file://"; static constexpr std::string_view Type = "file"; explicit FileStorage(std::filesystem::path ModulePath); virtual std::string Describe() const override { return fmt::format("file://{}"sv, m_StoragePath.generic_string()); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override { return {}; } virtual void ParseSettings(const CbObjectView&) override {} virtual std::vector List() override; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) override; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) override; virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&, PhaseStats&) override {} virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; private: std::filesystem::path m_StoragePath; std::filesystem::path m_StatePathName; std::filesystem::path m_CASPath; }; class S3Storage : public StorageBase { public: static constexpr std::string_view Prefix = "s3://"; static constexpr std::string_view Type = "s3"; S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); virtual std::string Describe() const override { return fmt::format("s3://{}/{}"sv, m_Client.BucketName(), m_KeyPrefix); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override; virtual void ParseSettings(const CbObjectView& Settings) override; virtual std::vector List() override; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) override; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) override; virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override; virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; private: S3Client& m_Client; std::string m_KeyPrefix; std::filesystem::path m_TempDir; uint64_t m_MultipartChunkSize; }; /////////////////////////////////////////////////////////////////////// // FileStorage implementations FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath)) { MakeSafeAbsolutePathInPlace(m_StoragePath); m_StatePathName = m_StoragePath / "current-state.cbo"; m_CASPath = m_StoragePath / "cas"; CreateDirectories(m_CASPath); } void FileStorage::SaveMetadata(const CbObject& Data) { ZEN_TRACE_CPU("FileStorage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); } CbObject FileStorage::LoadMetadata() { ZEN_TRACE_CPU("FileStorage::LoadMetadata"); if (!IsFile(m_StatePathName)) { return {}; } FileContents Content = ReadFile(m_StatePathName); if (Content.ErrorCode) { ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); } IoBuffer Payload = Content.Flatten(); CbValidateError Error; CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); if (Error != CbValidateError::None) { throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}"sv, m_StatePathName, ToString(Error))); } return Result; } std::vector FileStorage::List() { DirectoryContent DirContent; GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); std::vector Result; Result.reserve(DirContent.Files.size()); for (const std::filesystem::path& Path : DirContent.Files) { IoHash Hash; if (IoHash::TryParse(Path.filename().string(), Hash)) { Result.push_back(Hash); } } return Result; } void FileStorage::Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic& AbortFlag) { ZEN_TRACE_CPU("FileStorage::Put"); if (!AbortFlag.load()) { Stopwatch Timer = Stats.BeginRequest(); std::filesystem::path DestPath = m_CASPath / fmt::format("{}"sv, Hash); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) { throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } }); } void FileStorage::Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats]( std::atomic& AbortFlag) { ZEN_TRACE_CPU("FileStorage::Get"); if (!AbortFlag.load()) { Stopwatch Timer = Stats.BeginRequest(); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); std::filesystem::path SourcePath = m_CASPath / fmt::format("{}"sv, Hash); if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) { throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestinationPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } }); } void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { ZEN_UNUSED(Work); ZEN_UNUSED(WorkerPool); DeleteDirectories(m_StoragePath); } /////////////////////////////////////////////////////////////////////// // S3Storage implementations S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize) : m_Client(Client) , m_KeyPrefix(std::move(KeyPrefix)) , m_TempDir(std::move(TempDir)) , m_MultipartChunkSize(MultipartChunkSize) { } void S3Storage::SaveMetadata(const CbObject& Data) { ZEN_TRACE_CPU("S3Storage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3Result Result = m_Client.PutObject(Key, std::move(Payload)); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, Result.Error); } } CbObject S3Storage::LoadMetadata() { ZEN_TRACE_CPU("S3Storage::LoadMetadata"); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3GetObjectResult Result = m_Client.GetObject(Key); if (!Result.IsSuccess()) { if (Result.Error == S3GetObjectResult::NotFoundErrorText) { return {}; } throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, Result.Error); } CbValidateError Error; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); if (Error != CbValidateError::None) { throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error)); } return Meta; } CbObject S3Storage::GetSettings() { CbObjectWriter Writer; Writer << "MultipartChunkSize"sv << m_MultipartChunkSize; return Writer.Save(); } void S3Storage::ParseSettings(const CbObjectView& Settings) { m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize); } std::vector S3Storage::List() { std::string CasPrefix = m_KeyPrefix + "/cas/"; S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects under '{}': {}"sv, CasPrefix, Result.Error); } std::vector Hashes; Hashes.reserve(Result.Objects.size()); for (const S3ObjectInfo& Obj : Result.Objects) { size_t LastSlash = Obj.Key.rfind('/'); if (LastSlash == std::string::npos) { continue; } IoHash Hash; if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) { Hashes.push_back(Hash); } } return Hashes; } void S3Storage::Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic& AbortFlag) { ZEN_TRACE_CPU("S3Storage::Put"); if (AbortFlag.load()) { return; } Stopwatch Timer = Stats.BeginRequest(); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); S3Client& Client = m_Client; std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObjectMultipart( Key, Size, [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, m_MultipartChunkSize); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } else { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObject(Key, File.ReadAll()); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); }); } void S3Storage::Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) { std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { class WorkData { public: WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) { PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); } ~WorkData() { m_DestFile.Flush(); } void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } private: BasicFile m_DestFile; }; std::shared_ptr Data = std::make_shared(DestinationPath, Size); uint64_t Offset = 0; while (Offset < Size) { uint64_t ChunkSize = std::min(m_MultipartChunkSize, Size - Offset); Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic& AbortFlag) { ZEN_TRACE_CPU("S3Storage::GetRange"); if (AbortFlag.load()) { return; } Stopwatch Timer = Stats.BeginRequest(); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}"sv, Key, Offset, Offset + ChunkSize - 1, Chunk.Error); } Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); Stats.Bytes.fetch_add(ChunkSize, std::memory_order_relaxed); }); Offset += ChunkSize; } } else { Work.ScheduleWork( WorkerPool, [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic& AbortFlag) { ZEN_TRACE_CPU("S3Storage::Get"); if (AbortFlag.load()) { return; } Stopwatch Timer = Stats.BeginRequest(); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' from S3: {}"sv, Key, Chunk.Error); } if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) { std::error_code Ec; std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); if (Ec) { WriteFile(DestinationPath, Chunk.Content); } else { Chunk.Content.SetDeleteOnClose(false); Chunk.Content = {}; RenameFile(ChunkPath, DestinationPath, Ec); if (Ec) { Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk.Content.SetDeleteOnClose(true); WriteFile(DestinationPath, Chunk.Content); } } } else { WriteFile(DestinationPath, Chunk.Content); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); }); } } void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic& AbortFlag) { ZEN_TRACE_CPU("S3Storage::Touch"); if (AbortFlag.load()) { return; } Stopwatch Timer = Stats.BeginRequest(); auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); S3Result Result = m_Client.Touch(Key); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to touch '{}' in S3: {}"sv, Key, Result.Error); } }); } void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { std::string ModulePrefix = m_KeyPrefix + "/"; S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); if (!ListResult.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}"sv, ModulePrefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } S3Result DelResult = m_Client.DeleteObject(Key); if (!DelResult.IsSuccess()) { throw zen::runtime_error("Failed to delete S3 object '{}': {}"sv, Key, DelResult.Error); } }); } } /////////////////////////////////////////////////////////////////////// // IncrementalHydrator: the only HydrationStrategyBase implementation. // Summary emission for hydrate/dehydrate operations. // Queue-wait helper: time between earliest schedule and earliest worker start. UINT64_MAX // sentinels mean the corresponding event never happened (no work scheduled / nothing ran). inline uint64_t QueueWaitUs(uint64_t FirstScheduleUs, uint64_t FirstStartUs) { if (FirstScheduleUs == UINT64_MAX || FirstStartUs == UINT64_MAX || FirstStartUs <= FirstScheduleUs) { return 0; } return FirstStartUs - FirstScheduleUs; } inline uint64_t SafeAvg(uint64_t Total, uint64_t Count) { return Count ? Total / Count : 0; } void LogDehydrateSummary(std::string_view Prefix, const DehydrateStatistics& Stats, std::string_view ModuleId, const std::filesystem::path& Source, std::string_view Target) { const uint64_t TotalFiles = Stats.TotalFiles.load(); const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); // Hash phase: per-request data (BeginRequest/EndRequest tracks each hash op). // Hash is the first phase to schedule work, so cold-pool warm-up shows up here. const uint64_t HashFiles = Stats.Hash.Files.load(); const uint64_t HashBytes = Stats.Hash.Bytes.load(); const uint64_t HashReqCount = Stats.Hash.RequestCount.load(); const uint64_t HashReqTotalUs = Stats.Hash.RequestTotalUs.load(); const uint64_t HashReqMaxUs = Stats.Hash.RequestMaxUs.load(); const uint32_t HashPeak = Stats.Hash.InFlightPeak.load(); const uint64_t HashQueueUs = QueueWaitUs(Stats.Hash.FirstScheduleUs.load(), Stats.Hash.FirstStartUs.load()); // Cache hit rate: every TotalFile not re-hashed was served from the cached state. const uint32_t CacheHitPct = TotalFiles ? gsl::narrow_cast((TotalFiles - HashFiles) * 100 / TotalFiles) : 0; // Upload phase shares one ParallelWork across loose Put (Stats.Upload), pack-blob Put // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch). // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and // reported as a single combined "Requests" line. const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() + Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load(); const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() + Stats.Touch.RequestTotalUs.load() + Stats.PackTouch.RequestTotalUs.load(); const uint64_t UpReqMaxUs = std::max({Stats.Upload.RequestMaxUs.load(), Stats.PackUpload.RequestMaxUs.load(), Stats.Touch.RequestMaxUs.load(), Stats.PackTouch.RequestMaxUs.load()}); const uint32_t UpPeak = std::max({Stats.Upload.InFlightPeak.load(), Stats.PackUpload.InFlightPeak.load(), Stats.Touch.InFlightPeak.load(), Stats.PackTouch.InFlightPeak.load()}); // Combined first-schedule / first-start across all four phase counters. UINT64_MAX is // the unset sentinel and naturally loses to any real timestamp under std::min, so empty // phases fall through to the others. const uint64_t UpFirstSchedUs = std::min({Stats.Upload.FirstScheduleUs.load(), Stats.PackUpload.FirstScheduleUs.load(), Stats.Touch.FirstScheduleUs.load(), Stats.PackTouch.FirstScheduleUs.load()}); const uint64_t UpFirstStartUs = std::min({Stats.Upload.FirstStartUs.load(), Stats.PackUpload.FirstStartUs.load(), Stats.Touch.FirstStartUs.load(), Stats.PackTouch.FirstStartUs.load()}); const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs); const uint64_t LooseFiles = Stats.Upload.Files.load(); const uint64_t LooseBytes = Stats.Upload.Bytes.load(); const uint64_t TouchFiles = Stats.Touch.Files.load(); const uint64_t TouchBytes = Stats.Touch.Bytes.load(); const uint64_t PackTouchPacks = Stats.PackTouch.Files.load(); const uint64_t PackTouchBytes = Stats.PackTouch.Bytes.load(); const uint64_t PackCount = Stats.PackCount.load(); const uint64_t PackedFiles = Stats.PackedFiles.load(); const uint64_t PackBytes = Stats.PackBytes.load(); const uint64_t PackBuildUs = Stats.PackBuildUs.load(); const uint64_t PackUploadFiles = Stats.PackUpload.Files.load(); const uint64_t PackUploadBytes = Stats.PackUpload.Bytes.load(); ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load state: {}\n" " Dir scan: {}\n" " Hash: {} {}/{} files ({}) hashed, {}% cache hit, {}bits/s\n" " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" " List existing: {}\n" " Pack: {} {} packs, {} files, {}, {}bits/s\n" " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n" " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" " Save metadata: {}\n" " Clean: {}", Prefix, ModuleId, ThousandsNum(TotalFiles), NiceBytes(Stats.TotalBytes.load()), NiceTimeSpanUs(Stats.TotalUs.load()), Source.generic_string(), Target, NiceTimeSpanUs(Stats.LoadStateUs.load()), NiceTimeSpanUs(Stats.DirScanUs.load()), NiceTimeSpanUs(HashUs), ThousandsNum(HashFiles), ThousandsNum(TotalFiles), NiceBytes(HashBytes), CacheHitPct, NiceNum(BitsPerSecond(HashBytes, HashUs)), ThousandsNum(HashReqCount), NiceTimeSpanUs(SafeAvg(HashReqTotalUs, HashReqCount)), NiceTimeSpanUs(HashReqMaxUs), HashPeak, NiceTimeSpanUs(HashQueueUs), NiceTimeSpanUs(Stats.ListExistingUs.load()), NiceTimeSpanUs(PackBuildUs), ThousandsNum(PackCount), ThousandsNum(PackedFiles), NiceBytes(PackBytes), NiceNum(BitsPerSecond(PackBytes, PackBuildUs)), NiceTimeSpanUs(UploadUs), ThousandsNum(LooseFiles), NiceBytes(LooseBytes), ThousandsNum(PackUploadFiles), NiceBytes(PackUploadBytes), ThousandsNum(TouchFiles), NiceBytes(TouchBytes), ThousandsNum(PackTouchPacks), NiceBytes(PackTouchBytes), NiceNum(BitsPerSecond(LooseBytes + PackUploadBytes, UploadUs)), ThousandsNum(UpReqCount), NiceTimeSpanUs(SafeAvg(UpReqTotalUs, UpReqCount)), NiceTimeSpanUs(UpReqMaxUs), UpPeak, NiceTimeSpanUs(UpQueueUs), NiceTimeSpanUs(Stats.SaveMetadataUs.load()), NiceTimeSpanUs(Stats.CleanUs.load())); } void LogHydrateSummary(std::string_view Prefix, const HydrateStatistics& Stats, std::string_view ModuleId, std::string_view Source, const std::filesystem::path& Target) { const uint64_t DownloadUs = Stats.Download.ElapsedUs.load(); // Standalone (Stats.Download) and pack (Stats.PackDownload) downloads share one // ParallelWork. Per-request data is collected per PhaseStats by Storage::Get; // reported as a single combined "Requests" line. Multipart GETs may split a pack // into several ranged requests, so DlReqCount can exceed file/blob counts. const uint64_t StandaloneFiles = Stats.Download.Files.load(); const uint64_t StandaloneBytes = Stats.Download.Bytes.load(); const uint64_t PackDlFiles = Stats.PackDownload.Files.load(); const uint64_t PackDlBytes = Stats.PackDownload.Bytes.load(); const uint64_t DlReqCount = Stats.Download.RequestCount.load() + Stats.PackDownload.RequestCount.load(); const uint64_t DlReqTotalUs = Stats.Download.RequestTotalUs.load() + Stats.PackDownload.RequestTotalUs.load(); const uint64_t DlReqMaxUs = std::max(Stats.Download.RequestMaxUs.load(), Stats.PackDownload.RequestMaxUs.load()); const uint32_t DlPeak = std::max(Stats.Download.InFlightPeak.load(), Stats.PackDownload.InFlightPeak.load()); const uint64_t DlFirstSchedUs = std::min(Stats.Download.FirstScheduleUs.load(), Stats.PackDownload.FirstScheduleUs.load()); const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load()); const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs); const uint64_t PackCount = Stats.PackCount.load(); const uint64_t PackedFiles = Stats.PackedFiles.load(); const uint64_t PackUnpackUs = Stats.PackUnpackUs.load(); const uint64_t UnpackWriteBytes = Stats.UnpackWriteBytes.load(); const uint64_t CreateDirsUs = Stats.CreateDirsUs.load(); const uint64_t CreateDirsCount = Stats.CreateDirsCount.load(); const uint64_t CreateDirsRate = CreateDirsUs ? (CreateDirsCount * 1'000'000ull / CreateDirsUs) : 0; // Standalone and pack downloads share the Download phase elapsed reported below; // the unpack line is its own clock (slice + parallel SafeWriteFile). ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load metadata: {}\n" " Create dirs: {} {} dirs, {} dirs/s\n" " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n" " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" " Unpack: {} {} packs, {} files ({}), {}bits/s\n" " Clean: {}\n" " Finalize: {}\n" " Build state: {}", Prefix, ModuleId, ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.TotalBytes.load()), NiceTimeSpanUs(Stats.TotalUs.load()), Source, Target.generic_string(), NiceTimeSpanUs(Stats.LoadMetadataUs.load()), NiceTimeSpanUs(CreateDirsUs), ThousandsNum(CreateDirsCount), NiceNum(CreateDirsRate), NiceTimeSpanUs(DownloadUs), ThousandsNum(StandaloneFiles), NiceBytes(StandaloneBytes), ThousandsNum(PackDlFiles), NiceBytes(PackDlBytes), NiceNum(BitsPerSecond(StandaloneBytes + PackDlBytes, DownloadUs)), ThousandsNum(DlReqCount), NiceTimeSpanUs(SafeAvg(DlReqTotalUs, DlReqCount)), NiceTimeSpanUs(DlReqMaxUs), DlPeak, NiceTimeSpanUs(QueueUs), NiceTimeSpanUs(PackUnpackUs), ThousandsNum(PackCount), ThousandsNum(PackedFiles), NiceBytes(UnpackWriteBytes), NiceNum(BitsPerSecond(UnpackWriteBytes, PackUnpackUs)), NiceTimeSpanUs(Stats.CleanUs.load()), NiceTimeSpanUs(Stats.FinalizeUs.load()), NiceTimeSpanUs(Stats.BuildStateUs.load())); } /////////////////////////////////////////////////////////////////////// // File-manifest entry: one of these per file in a module's state. Lives in // the namespace (rather than nested in IncrementalHydrator) so the helper // functions below can take it by reference. struct Entry { std::string RelativePath; uint64_t Size; uint64_t ModTick; IoHash Hash; bool IsPacked = false; // true if content is part of a pack (PackHash valid) // Hash of the pack's concatenated raw bytes (= pack's CAS key). Hydrate downloads // the pack once and slices this entry out at the offset recorded in Pack.Entries[] // for the matching Entry.Hash. IoHash PackHash; }; /////////////////////////////////////////////////////////////////////// // Pack types: produced by dehydrate's pack phase, consumed by the state // writer; consumed during hydrate to reconstruct slices. struct BuiltPackEntry { IoHash Hash; uint64_t Size; }; struct BuiltPack { IoHash PackHash; uint64_t Size; std::vector Entries; }; struct PackEntryDescriptor { IoHash Hash; uint64_t Size; uint64_t Offset; }; struct PackDescriptor { uint64_t Size = 0; std::vector Entries; }; using EntryGroup = std::vector; using PackPlan = std::vector; /////////////////////////////////////////////////////////////////////// // Holds a per-module StorageBase and threading context; drives the // hydrate/dehydrate algorithm. class IncrementalHydrator : public HydrationStrategyBase { public: IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr Storage, std::span Excludes); virtual ~IncrementalHydrator() override; virtual void Dehydrate(const CbObject& CachedState) override; virtual CbObject Hydrate() override; virtual void Obliterate() override; private: std::unique_ptr m_Storage; HydrationConfig m_Config; std::vector m_Excludes; WorkerThreadPool m_FallbackWorkPool; std::atomic m_FallbackAbortFlag{false}; std::atomic m_FallbackPauseFlag{false}; HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, .AbortFlag = &m_FallbackAbortFlag, .PauseFlag = &m_FallbackPauseFlag}; }; /////////////////////////////////////////////////////////////////////// // Phase helpers used by IncrementalHydrator::Dehydrate and ::Hydrate. // These keep the two big member functions readable as a sequence of // named phases. Helpers take only the data they need and never the // full HydrationConfig or IncrementalHydrator. // Removes each path, ignoring errors. Used by both Dehydrate and Hydrate // pack-cleanup guards plus the explicit pre-rename cleanup on Hydrate. void RemoveStagedPackFiles(const std::vector& Files) { for (const std::filesystem::path& P : Files) { std::error_code Ec; RemoveFile(P, Ec); if (Ec) { ZEN_WARN("Failed to remove staged pack file '{}': {}", P, Ec.message()); } } } // Collects parent_path() of each input, sorts lex ascending (= ancestor-first), // uniques, then drops any entry that is a strict component-prefix of the next // entry (its descendant's CreateDirectories recursion will create it). Calls // CreateDirectories on the surviving leaves. Use before scheduling parallel // writes so worker threads do not race to create the same parents. size_t CreateParentDirectories(const std::vector& FilePaths) { if (FilePaths.empty()) { return 0; } std::vector Dirs; Dirs.reserve(FilePaths.size()); for (const std::filesystem::path& File : FilePaths) { if (File.has_parent_path()) { Dirs.push_back(File.parent_path()); } } if (Dirs.empty()) { return 0; } std::sort(Dirs.begin(), Dirs.end()); Dirs.erase(std::unique(Dirs.begin(), Dirs.end()), Dirs.end()); size_t Write = 0; for (size_t Read = 0; Read < Dirs.size(); ++Read) { if (Read + 1 < Dirs.size()) { const std::filesystem::path& Cur = Dirs[Read]; const std::filesystem::path& Next = Dirs[Read + 1]; const auto [ItCur, ItNext] = std::mismatch(Cur.begin(), Cur.end(), Next.begin(), Next.end()); if (ItCur == Cur.end() && ItNext != Next.end()) { continue; // Cur is component-prefix of Next; descendant will create it } } if (Write != Read) { Dirs[Write] = std::move(Dirs[Read]); } ++Write; } Dirs.resize(Write); for (const std::filesystem::path& Dir : Dirs) { CreateDirectories(Dir); } return Dirs.size(); } // Parses CachedState["Files"sv] into a path-keyed lookup + parallel Entries vector. // Used by Dehydrate to seed its hash cache; Dehydrate ignores PackHash here. void LoadCachedStateEntries(const CbObject& CachedState, std::unordered_map& OutLookup, std::vector& OutEntries) { for (CbFieldView FieldView : CachedState["Files"sv].AsArrayView()) { CbObjectView EntryView = FieldView.AsObjectView(); std::string RelativePath(EntryView["Path"sv].AsString()); uint64_t Size = EntryView["Size"sv].AsUInt64(); uint64_t ModTick = EntryView["ModTick"sv].AsUInt64(); IoHash Hash = EntryView["Hash"sv].AsHash(); OutLookup.insert_or_assign(RelativePath, OutEntries.size()); OutEntries.push_back(Entry{.RelativePath = std::move(RelativePath), .Size = Size, .ModTick = ModTick, .Hash = Hash}); } } // Computes Out.Hash for a single file. For Oodle-compressed files in a `cas/` subdir // the hash is a meta-hash combining the embedded RawHash with the file size, which // avoids a collision between an uncompressed file and a same-content compressed file. // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential // read, friendlier to the Windows cache manager than mmap). void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out) { if (AbsPath.extension().empty()) { std::string_view Rel = Out.RelativePath; std::string_view First = Rel.substr(0, Rel.find('/')); if (First.ends_with("cas")) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); if (Compressed) { IoHashStream Hasher; Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); Hasher.Append(&Out.Size, sizeof(Out.Size)); Out.Hash = Hasher.GetHash(); return; } } } BasicFile File(AbsPath, BasicFile::Mode::kRead); IoHashStream Hasher; File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); Out.Hash = Hasher.GetHash(); } // Walks DirContent, fills Entries[], schedules hash work for files whose hash // is not in the StateEntries cache. The caller owns the ParallelWork's Wait() // so other operations (e.g. Storage::List) can overlap with hashing. Files whose // relative path matches any pattern in Excludes are dropped here (the hub-wide // default list - see DefaultExcludes() above - covers transient runtime files // like .lock and .sentry-native; the user can override via HydrationOptions). // Returns the number of accepted (non-filtered) entries; OutTotalBytes accumulates // their sizes. size_t ScanAndScheduleHashWork(const DirectoryContent& DirContent, const std::filesystem::path& ServerStateDir, const std::unordered_map& StateEntryLookup, const std::vector& StateEntries, std::span Excludes, std::vector& Entries, uint64_t& OutTotalBytes, ParallelWork& Work, WorkerThreadPool& Pool, PhaseStats& HashStats) { size_t TotalFiles = 0; OutTotalBytes = 0; for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); std::string RelKey = RelativePath.generic_string(); if (IsExcluded(RelKey, Excludes)) { continue; } const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); Entry& CurrentEntry = Entries[TotalFiles]; CurrentEntry.RelativePath = std::move(RelKey); CurrentEntry.Size = DirContent.FileSizes[FileIndex]; CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; bool FoundHash = false; if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath); KnownIt != StateEntryLookup.end()) { const Entry& StateEntry = StateEntries[KnownIt->second]; if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) { CurrentEntry.Hash = StateEntry.Hash; FoundHash = true; } } if (!FoundHash) { Work.ScheduleWork(Pool, [AbsPath, EntryIndex = TotalFiles, &Entries, &HashStats](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } Stopwatch Timer = HashStats.BeginRequest(); auto GuardEnd = MakeGuard([&] { HashStats.EndRequest(Timer.GetElapsedTimeUs()); }); Entry& CurrentEntry = Entries[EntryIndex]; HashFileContent(AbsPath, CurrentEntry); HashStats.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); }); HashStats.Files.fetch_add(1, std::memory_order_relaxed); HashStats.RecordScheduled(); } TotalFiles++; OutTotalBytes += CurrentEntry.Size; } return TotalFiles; } // Plans pack composition deterministically: groups Entries by content hash for // candidates Size < Threshold, sorts groups by ascending IoHash, bin-packs greedily // up to MaxPackBytes, and discards any pack with fewer than two entries. Sets // `IsPacked = true` on every entry that survives into a published pack so the caller // can immediately distinguish loose-CAS uploads from pack-bound uploads. // Returns one PackPlan per pack to build (empty if no packs are produced). std::vector PlanPacks(std::vector& Entries, uint64_t Threshold, uint64_t MaxPackBytes) { // 1. Group small-file Entries[] indices by content hash. Every index in a group // shares the same bytes, so any one of them sources the pack content; all of // them get tagged IsPacked once the pack hash is known. std::unordered_map UniqueMap; for (size_t Index = 0; Index < Entries.size(); ++Index) { if (Entries[Index].Size >= Threshold) { continue; } UniqueMap[Entries[Index].Hash].push_back(Index); } // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule. if (UniqueMap.size() < 2) { return {}; } auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; }; auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; }; // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move. std::vector Ordered; Ordered.reserve(UniqueMap.size()); for (auto& [h, g] : UniqueMap) { Ordered.push_back(std::move(g)); } std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); }); // 3. Bin-pack greedily under MaxPackBytes. std::vector Plans; PackPlan Current; uint64_t CurrentSize = 0; for (EntryGroup& Group : Ordered) { const uint64_t Size = GroupSize(Group); if (Size >= MaxPackBytes) { continue; // fallback to standalone upload } if (CurrentSize + Size > MaxPackBytes && !Current.empty()) { if (Current.size() >= 2) { Plans.push_back(std::move(Current)); } Current = {}; CurrentSize = 0; } Current.push_back(std::move(Group)); CurrentSize += Size; } if (Current.size() >= 2) { Plans.push_back(std::move(Current)); } // Tag entries that survived into a published pack so the loose-upload loop can skip // them. Done after bin-packing so groups discarded by the <2-entry rule are not tagged. for (const PackPlan& Plan : Plans) { for (const EntryGroup& Group : Plan) { for (size_t Idx : Group) { Entries[Idx].IsPacked = true; } } } return Plans; } // Reads each source file in Plan, hashes the concatenation, writes raw bytes to // TempPath. Throws on size mismatch (message includes ModuleId for grep). Scratch // is owned by the caller and reused across packs; size must be >= the largest // candidate file (i.e. the pack threshold). BuiltPack BuildPack(const PackPlan& Plan, const std::vector& Entries, const std::filesystem::path& ServerStateDir, const std::filesystem::path& TempPath, std::string_view ModuleId, std::vector& Scratch) { BuiltPack BP; BP.Entries.reserve(Plan.size()); IoHashStream Hasher; uint64_t Offset = 0; { BasicFile PackFile(TempPath, BasicFile::Mode::kTruncate); BasicFileWriter Writer(PackFile, /*BufferSize*/ 64 * 1024); for (const EntryGroup& Group : Plan) { // Every Entries[idx] in a group shares the same content hash (= same bytes), // so the first one is a fine source. const Entry& Rep = Entries[Group.front()]; std::filesystem::path AbsPath = MakeSafeAbsolutePath(ServerStateDir / Rep.RelativePath); BasicFile Src(AbsPath, BasicFile::Mode::kRead); const uint64_t Size = Src.FileSize(); if (Size != Rep.Size || Size > Scratch.size()) { throw zen::runtime_error("Pack entry for hash {} (module '{}'): expected {} bytes, file is {} at '{}'"sv, Rep.Hash, ModuleId, Rep.Size, Size, AbsPath); } Src.Read(Scratch.data(), Size, 0); Hasher.Append(Scratch.data(), Size); Writer.Write(Scratch.data(), Size, Offset); Offset += Size; BP.Entries.push_back(BuiltPackEntry{.Hash = Rep.Hash, .Size = Rep.Size}); } Writer.Flush(); } BP.PackHash = Hasher.GetHash(); BP.Size = Offset; return BP; } // Schedules either a Put (if Hash is not in CAS) or a Touch (if it is). Updates // counters on the matching PhaseStats - Files++ in both cases, Bytes+=Size on the // touch path so touched-bytes accounting tracks size-equivalent work that did not // transfer. Both paths call RecordScheduled so the queue-wait line covers cache-warm // dehydrates (only Touches scheduled). Used for both loose CAS (UploadStats=Stats.Upload, // TouchStats=Stats.Touch) and pack blobs (UploadStats=Stats.PackUpload, TouchStats= // Stats.PackTouch). UploadStats and TouchStats must be distinct PhaseStats so the upload- // throughput metric is not inflated by touched bytes that did not transfer. void ScheduleUploadOrTouch(StorageBase& Storage, ParallelWork& Work, WorkerThreadPool& Pool, const std::unordered_set& ExistsLookup, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& UploadStats, PhaseStats& TouchStats) { if (ExistsLookup.contains(Hash)) { // Refresh the backend's modification time so lifecycle-expiration policies // do not evict CAS entries that are still referenced by this module. Storage.Touch(Work, Pool, Hash, TouchStats); TouchStats.Files.fetch_add(1, std::memory_order_relaxed); TouchStats.Bytes.fetch_add(Size, std::memory_order_relaxed); TouchStats.RecordScheduled(); } else { Storage.Put(Work, Pool, Hash, Size, SourcePath, UploadStats); UploadStats.Files.fetch_add(1, std::memory_order_relaxed); UploadStats.RecordScheduled(); } } // Builds and saves the dehydrate state.cbo: header fields, optional Packs[] array, // and the Files[] array. ModuleId is stored in the manifest. void WriteDehydrateMetadata(StorageBase& Storage, const std::filesystem::path& ServerStateDir, std::string_view ModuleId, uint64_t TotalBytes, uint64_t DehydrateDurationMs, const std::vector& BuiltPacks, const std::vector& Entries) { UtcTime Now = UtcTime::Now(); std::string DehydrateTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z"sv, Now.Tm.tm_year + 1900, Now.Tm.tm_mon + 1, Now.Tm.tm_mday, Now.Tm.tm_hour, Now.Tm.tm_min, Now.Tm.tm_sec, Now.Ms); CbObjectWriter Meta; Meta << "SchemaVersion"sv << HydrationSchemaVersion; Meta << "SourceFolder"sv << ServerStateDir.generic_string(); Meta << "ModuleId"sv << ModuleId; Meta << "HostName"sv << GetMachineName(); Meta << "DehydrateTimeUtc"sv << DehydrateTimeUtc; Meta << "DehydrateDurationMs"sv << DehydrateDurationMs; Meta << "TotalSizeBytes"sv << TotalBytes; Meta << "StorageSettings"sv << Storage.GetSettings(); if (!BuiltPacks.empty()) { Meta.BeginArray("Packs"sv); for (const BuiltPack& BP : BuiltPacks) { Meta.BeginObject(); { Meta << "Hash"sv << BP.PackHash; Meta << "Size"sv << BP.Size; Meta.BeginArray("Entries"sv); for (const BuiltPackEntry& BPE : BP.Entries) { Meta.BeginObject(); { Meta << "Hash"sv << BPE.Hash; Meta << "Size"sv << BPE.Size; } Meta.EndObject(); } Meta.EndArray(); } Meta.EndObject(); } Meta.EndArray(); } Meta.BeginArray("Files"sv); for (const Entry& CurrentEntry : Entries) { Meta.BeginObject(); { Meta << "Path"sv << CurrentEntry.RelativePath; Meta << "Size"sv << CurrentEntry.Size; Meta << "ModTick"sv << CurrentEntry.ModTick; Meta << "Hash"sv << CurrentEntry.Hash; if (CurrentEntry.IsPacked) { Meta << "PackHash"sv << CurrentEntry.PackHash; } } Meta.EndObject(); } Meta.EndArray(); Storage.SaveMetadata(Meta.Save()); } // Parses Meta["Files"sv] into Entries[] + path lookup. Reads PackHash and sets // IsPacked when present. Used by Hydrate. void ParseFilesArray(const CbObject& Meta, std::vector& OutEntries, std::unordered_map& OutLookup, uint64_t& OutTotalSize) { OutTotalSize = 0; for (CbFieldView FieldView : Meta["Files"sv]) { CbObjectView EntryView = FieldView.AsObjectView(); if (EntryView) { Entry NewEntry = {.RelativePath{EntryView["Path"sv].AsString()}, .Size = EntryView["Size"sv].AsUInt64(), .ModTick = EntryView["ModTick"sv].AsUInt64(), .Hash = EntryView["Hash"sv].AsHash()}; IoHash PackHash = EntryView["PackHash"sv].AsHash(); if (PackHash != IoHash::Zero) { NewEntry.IsPacked = true; NewEntry.PackHash = PackHash; } OutTotalSize += NewEntry.Size; OutLookup.insert_or_assign(NewEntry.RelativePath, OutEntries.size()); OutEntries.emplace_back(std::move(NewEntry)); } } } // Parses Meta["Packs"sv] into a hash-keyed descriptor map. Each PackDescriptor's // Entries[] gets a prefix-sum offset for O(1) slice lookup at unpack time. std::unordered_map ParsePacksArray(const CbObject& Meta) { std::unordered_map PackMap; for (CbFieldView FieldView : Meta["Packs"sv]) { CbObjectView PackView = FieldView.AsObjectView(); if (!PackView) { continue; } IoHash PackHash = PackView["Hash"sv].AsHash(); PackDescriptor PD; PD.Size = PackView["Size"sv].AsUInt64(); uint64_t Offset = 0; for (CbFieldView EF : PackView["Entries"sv]) { CbObjectView EV = EF.AsObjectView(); if (!EV) { continue; } PackEntryDescriptor E{.Hash = EV["Hash"sv].AsHash(), .Size = EV["Size"sv].AsUInt64(), .Offset = Offset}; Offset += E.Size; PD.Entries.push_back(E); } PackMap.emplace(PackHash, std::move(PD)); } return PackMap; } // For each downloaded pack: read it into a heap buffer, verify size, and slice // into per-entry IoBuffers (zero-copy views). Throws on size mismatch with the // existing message format. std::unordered_map BuildHashToSlice( const std::unordered_map& PackMap, const std::filesystem::path& TempDir, std::string_view ModuleId) { std::unordered_map HashToSlice; size_t TotalPackEntries = 0; for (const auto& [PackHash, PD] : PackMap) { TotalPackEntries += PD.Entries.size(); } HashToSlice.reserve(TotalPackEntries); for (const auto& [PackHash, PD] : PackMap) { std::filesystem::path PackPath = TempDir / "packs" / fmt::format("{}.bin"sv, PackHash); // Heap-allocated buffer via direct ReadFile avoids mmap materialization // and page-fault latency during the parallel unpack-write that follows. BasicFile PackFile(PackPath, BasicFile::Mode::kRead); IoBuffer PackBuf = PackFile.ReadAll(); if (PackBuf.GetSize() != PD.Size) { throw zen::runtime_error("Pack '{}' size mismatch for module '{}' at '{}': expected {}, got {}"sv, PackHash, ModuleId, PackPath, PD.Size, PackBuf.GetSize()); } for (const auto& E : PD.Entries) { HashToSlice.emplace(E.Hash, IoBuffer(PackBuf, E.Offset, E.Size)); } } return HashToSlice; } // Migrates contents of SourceDir into ServerStateDir. Same-volume: top-level rename // per child. Different-volume: full CopyTree fallback. Caller is responsible for // final cleanup of the parent temp directory (which may hold sibling staging dirs // like packs/ that must NOT migrate). void MigrateTempToState(const std::filesystem::path& SourceDir, const std::filesystem::path& ServerStateDir, const HydrationConfig::ThreadingOptions& Threading) { // If the two paths share at least one common component they are on the same drive/volume // and atomic renames will succeed. Otherwise fall back to a full copy. auto [ItSrc, ItState] = std::mismatch(SourceDir.begin(), SourceDir.end(), ServerStateDir.begin(), ServerStateDir.end()); if (ItSrc != SourceDir.begin()) { DirectoryContent DirContent; GetDirectoryContent(*Threading.WorkerPool, SourceDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& AbsPath : DirContent.Directories) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'"sv, AbsPath, Dest)); } } for (const std::filesystem::path& AbsPath : DirContent.Files) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'"sv, AbsPath, Dest)); } } } else { // Slow path: source and target are on different filesystems, so rename // would fail. Copy the tree instead. ZEN_DEBUG("SourceDir and ServerStateDir are on different filesystems - using CopyTree"); CopyTree(SourceDir, ServerStateDir, {.EnableClone = true}); } } // Walks ServerStateDir and emits a Files[] cache for the next dehydrate's // hash-shortcut (mirrors Load state on dehydrate). Files on disk that aren't in // EntryLookup (manifest) are skipped with a WARN - typically leftovers from an // earlier crashed hydrate. CbObject BuildHydrateState(const std::filesystem::path& ServerStateDir, const std::unordered_map& EntryLookup, const std::vector& Entries, std::string_view ModuleId, const HydrationConfig::ThreadingOptions& Threading) { DirectoryContent DirContent; GetDirectoryContent(*Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); CbObjectWriter HydrateState; HydrateState.BeginArray("Files"sv); for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); std::string RelKey = RelativePath.generic_string(); if (auto It = EntryLookup.find(RelKey); It != EntryLookup.end()) { HydrateState.BeginObject(); { HydrateState << "Path"sv << RelKey; HydrateState << "Size"sv << DirContent.FileSizes[FileIndex]; HydrateState << "ModTick"sv << DirContent.FileModificationTicks[FileIndex]; HydrateState << "Hash"sv << Entries[It->second].Hash; } HydrateState.EndObject(); } else { // File on disk after hydrate but not in the manifest. Can happen when TempDir // contained leftovers from a prior crashed hydrate that survived to the rename // phase. Skip it rather than failing - the manifest is the source of truth for // the cached state; the stray file is harmless and gets caught by the next // dehydrate's directory scan. ZEN_WARN("Hydrate: file '{}' present on disk but missing from manifest for module '{}'; skipping", RelKey, ModuleId); } } HydrateState.EndArray(); return HydrateState.Save(); } /////////////////////////////////////////////////////////////////////// // IncrementalHydrator implementations IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr Storage, std::span Excludes) : m_Storage(std::move(Storage)) , m_Config(Config) , m_Excludes(Excludes.begin(), Excludes.end()) , m_FallbackWorkPool(0) { if (Config.Threading) { m_Threading = *Config.Threading; } } IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate"); Stopwatch TotalTimer; DehydrateStatistics Stats; const std::string StorageTarget = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { // Load the cache from the previous dehydrate to short-circuit re-hashing of // unchanged files (matched by Path+Size+ModTick). std::unordered_map StateEntryLookup; std::vector StateEntries; { Stopwatch LoadStateTimer; LoadCachedStateEntries(CachedState, StateEntryLookup, StateEntries); Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs(); } // Scan the server state directory. DirectoryContent DirContent; { Stopwatch DirScanTimer; GetDirectoryContent(*m_Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); Stats.DirScanUs = DirScanTimer.GetElapsedTimeUs(); } ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", m_Config.ModuleId, m_Config.ServerStateDir, DirContent.Files.size(), NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); // Hash phase: build Entries[] and schedule hash work for files not in the cache. // Storage::List runs in parallel with hashing to populate ExistsLookup before Wait. std::vector Entries; Entries.resize(DirContent.Files.size()); uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; std::unordered_set ExistsLookup; { Stats.Hash.PhaseClock.Reset(); Stopwatch HashTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); TotalFiles = ScanAndScheduleHashWork(DirContent, ServerStateDir, StateEntryLookup, StateEntries, m_Excludes, Entries, TotalBytes, Work, *m_Threading.WorkerPool, Stats.Hash); { Stopwatch ListTimer; std::vector ExistingEntries = m_Storage->List(); ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); Stats.ListExistingUs = ListTimer.GetElapsedTimeUs(); } Work.Wait(); Entries.resize(TotalFiles); Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs(); Stats.TotalFiles = TotalFiles; Stats.TotalBytes = TotalBytes; } // Pack planning + unified upload phase. Plan first so we know which entries are // packed, then run loose-CAS uploads and pack builds inside a single ParallelWork. // Loose uploads are scheduled up front so they execute on the worker pool while // the calling thread runs the serial pack-build loop; each completed pack hands // its upload to the same ParallelWork. One Wait covers everything. std::vector BuiltPacks; std::vector StagedPackFiles; auto PackCleanup = MakeGuard([&] { RemoveStagedPackFiles(StagedPackFiles); // Best-effort drop of the now-empty packs/ subdir so TempDir is clean after // dehydrate. Mirrors the explicit cleanup on the hydrate-side success path. std::error_code Ec; DeleteDirectories(MakeSafeAbsolutePath(m_Config.TempDir) / "packs", Ec); }); // PlanPacks tags Entries[Idx].IsPacked on every index that survives into a pack, // so the loose-upload loop can skip them. PackHash is set later per-pack as each // pack is built. const std::vector Pending = m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector{}; uint64_t DehydrateDurationMs = 0; { // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all // four PhaseClocks to the same baseline so the queue-wait line can combine // their FirstScheduleUs / FirstStartUs across the four PhaseStats. Stats.Upload.PhaseClock.Reset(); Stats.PackUpload.PhaseClock.Reset(); Stats.Touch.PhaseClock.Reset(); Stats.PackTouch.PhaseClock.Reset(); Stopwatch UploadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); // Schedule loose-CAS uploads first so they begin running while the pack-build // loop below executes serially on this thread. for (const Entry& CurrentEntry : Entries) { if (CurrentEntry.IsPacked) { continue; // pack phase covers it } ScheduleUploadOrTouch(*m_Storage, Work, *m_Threading.WorkerPool, ExistsLookup, CurrentEntry.Hash, CurrentEntry.Size, MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), Stats.Upload, Stats.Touch); } if (!Pending.empty()) { ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate::Pack"); std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); std::filesystem::path PacksDir = TempDir / "packs"; CreateDirectories(PacksDir); // Reusable scratch for small-file reads. Every pack candidate has Size < // PackThresholdBytes so a single buffer of that size holds any one file. // Build runs serially on the caller's thread - typical modules produce 1-2 // packs at ~5 ms each, too small to be worth the parallel-dispatch overhead. std::vector Scratch(m_Config.PackThresholdBytes); for (const PackPlan& Plan : Pending) { // Pre-register the staging path so PackCleanup removes it even if the // stream-write loop below throws mid-flight. Oid::String_t OidStr; Oid::NewOid().ToString(OidStr); std::filesystem::path PackTempPath = PacksDir / fmt::format("{}.bin"sv, OidStr); StagedPackFiles.push_back(PackTempPath); Stopwatch BuildTimer; BuiltPack BP = BuildPack(Plan, Entries, ServerStateDir, PackTempPath, m_Config.ModuleId, Scratch); Stats.PackBuildUs.fetch_add(BuildTimer.GetElapsedTimeUs(), std::memory_order_relaxed); // Stamp the pack hash on every matching entry; state.cbo's Files[] reads // PackHash off these entries when emitting per-file PackHash references. uint64_t PackedEntryCount = 0; for (const EntryGroup& Group : Plan) { for (size_t Idx : Group) { Entries[Idx].PackHash = BP.PackHash; } PackedEntryCount += Group.size(); } Stats.PackCount.fetch_add(1, std::memory_order_relaxed); Stats.PackedFiles.fetch_add(PackedEntryCount, std::memory_order_relaxed); Stats.PackBytes.fetch_add(BP.Size, std::memory_order_relaxed); ScheduleUploadOrTouch(*m_Storage, Work, *m_Threading.WorkerPool, ExistsLookup, BP.PackHash, BP.Size, PackTempPath, Stats.PackUpload, Stats.PackTouch); BuiltPacks.push_back(std::move(BP)); } } Work.Wait(); // Upload, PackUpload, Touch, and PackTouch share a single ParallelWork. Only // Upload's ElapsedUs is read by the formatter; the others' bytes/requests are // reported against the same Upload phase elapsed. Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); DehydrateDurationMs = TotalTimer.GetElapsedTimeMs(); } // Persist the new state.cbo with header, Packs[], and Files[]. { Stopwatch SaveMetadataTimer; WriteDehydrateMetadata(*m_Storage, ServerStateDir, m_Config.ModuleId, TotalBytes, DehydrateDurationMs, BuiltPacks, Entries); Stats.SaveMetadataUs = SaveMetadataTimer.GetElapsedTimeUs(); } // Server-state dir contents have been uploaded; wipe them. ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogDehydrateSummary("Dehydration complete"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } catch (const std::exception& Ex) { ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogDehydrateSummary("Dehydration failed"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } } CbObject IncrementalHydrator::Hydrate() { ZEN_TRACE_CPU("IncrementalHydrator::Hydrate"); Stopwatch TotalTimer; HydrateStatistics Stats; const std::string StorageSource = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); // Hydrated files land in TempDir/state/, pack staging blobs in TempDir/packs/. Keeping // them in sibling subdirectories means MigrateTempToState only needs to hand the state/ // subtree across to ServerStateDir; pack staging never has a chance to leak into the // final server state directory. const std::filesystem::path TempStateDir = TempDir / "state"; try { CreateDirectories(ServerStateDir); CreateDirectories(TempDir); // A prior hydrate may have crashed after downloading but before the rename phase, // leaving stale files in TempDir that would otherwise get migrated into // ServerStateDir and trip the post-rename manifest check. CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); CreateDirectories(TempStateDir); // Load metadata; absent metadata means a fresh module - clean state, return. CbObject Meta; { Stopwatch LoadTimer; Meta = m_Storage->LoadMetadata(); Stats.LoadMetadataUs = LoadTimer.GetElapsedTimeUs(); } if (!Meta) { ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); return CbObject(); } // Schema-version gate: refuse manifests written by a newer hub. Missing field is // treated as version 0 (legacy / pre-versioning) and decoded best-effort - the // optional fields (Packs[], StorageSettings) absent from v0 manifests fall back // to defaults via ParsePacksArray / ParseSettings. const uint32_t SchemaVersion = Meta["SchemaVersion"sv].AsUInt32(0); if (SchemaVersion > HydrationSchemaVersion) { throw zen::runtime_error("State manifest for module '{}' has schema version {} but this hub supports up to {}"sv, m_Config.ModuleId, SchemaVersion, HydrationSchemaVersion); } // Parse manifest: Files[] for per-file metadata, Packs[] (optional) for pack // composition. Missing Packs[] = old-format state; treated as all-standalone. std::unordered_map EntryLookup; std::vector Entries; uint64_t TotalSize = 0; ParseFilesArray(Meta, Entries, EntryLookup, TotalSize); std::unordered_map PackMap = ParsePacksArray(Meta); Stats.TotalFiles = Entries.size(); Stats.TotalBytes = TotalSize; Stats.PackCount = PackMap.size(); ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files, {} packs", m_Config.ModuleId, m_Config.ServerStateDir, Entries.size(), NiceBytes(TotalSize), PackMap.size()); // Re-apply storage settings from state.cbo (e.g. S3 multipart chunk size). m_Storage->ParseSettings(Meta["StorageSettings"sv].AsObjectView()); // Per-entry destination paths under TempStateDir, indexed parallel to Entries[]. Used // by the standalone download dispatch and (for IsPacked entries) by the unpack // dispatch. Pre-creating parents once for the union covers both phases without a second pass. std::vector EntryPaths; EntryPaths.reserve(Entries.size()); for (const Entry& CurrentEntry : Entries) { EntryPaths.push_back(MakeSafeAbsolutePath(TempStateDir / CurrentEntry.RelativePath)); } { Stopwatch CreateDirsTimer; auto RecordElapsed = MakeGuard([&] { Stats.CreateDirsUs = CreateDirsTimer.GetElapsedTimeUs(); }); Stats.CreateDirsCount = CreateParentDirectories(EntryPaths); } // Download phase: pack GETs first (so unpack can begin sooner), then standalone files. // Both share the same ParallelWork; per-phase byte / request counts stay separate via // PackDownload vs Download stats while the elapsed time is reported once. { // Download and PackDownload share one ParallelWork; reset both PhaseClocks // to the same baseline so the queue-wait line can combine their FirstScheduleUs // / FirstStartUs across the two PhaseStats. Stats.Download.PhaseClock.Reset(); Stats.PackDownload.PhaseClock.Reset(); Stopwatch DownloadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const std::filesystem::path PacksDir = TempDir / "packs"; if (!PackMap.empty()) { CreateDirectories(PacksDir); } for (const auto& [PackHash, PD] : PackMap) { std::filesystem::path PackPath = PacksDir / fmt::format("{}.bin"sv, PackHash); m_Storage->Get(Work, *m_Threading.WorkerPool, PackHash, PD.Size, PackPath, Stats.PackDownload); Stats.PackDownload.Files.fetch_add(1, std::memory_order_relaxed); Stats.PackDownload.RecordScheduled(); } for (size_t I = 0; I < Entries.size(); ++I) { if (Entries[I].IsPacked) { continue; // handled in the unpack phase below } m_Storage->Get(Work, *m_Threading.WorkerPool, Entries[I].Hash, Entries[I].Size, EntryPaths[I], Stats.Download); Stats.Download.Files.fetch_add(1, std::memory_order_relaxed); Stats.Download.RecordScheduled(); } Work.Wait(); Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs(); } // Unpack phase: verify each downloaded pack, build hash->slice map, parallel-write. if (!PackMap.empty()) { ZEN_TRACE_CPU("IncrementalHydrator::Hydrate::Unpack"); std::unordered_map HashToSlice = BuildHashToSlice(PackMap, TempDir, m_Config.ModuleId); { Stopwatch UnpackTimer; ParallelWork UnpackWork(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (size_t I = 0; I < Entries.size(); ++I) { if (!Entries[I].IsPacked) { continue; } auto It = HashToSlice.find(Entries[I].Hash); if (It == HashToSlice.end()) { throw zen::runtime_error("Packed file '{}' references unknown pack content hash '{}' in module '{}'"sv, Entries[I].RelativePath, Entries[I].Hash, m_Config.ModuleId); } UnpackWork.ScheduleWork(*m_Threading.WorkerPool, [&Stats, &Path = EntryPaths[I], Slice = &It->second](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } TemporaryFile::SafeWriteFile(Path, *Slice); Stats.UnpackWriteBytes.fetch_add(Slice->GetSize(), std::memory_order_relaxed); Stats.PackedFiles.fetch_add(1, std::memory_order_relaxed); }); } UnpackWork.Wait(); Stats.PackUnpackUs = UnpackTimer.GetElapsedTimeUs(); } // Release the pack buffers (each IoBuffer slice holds a ref to the pack's underlying // heap buffer) before the rename/verify phase runs - avoids keeping ~sum(pack sizes) // bytes alive across those phases. HashToSlice.clear(); } // Downloaded successfully - swap TempStateDir contents into ServerStateDir, then // sweep the rest of TempDir (empty TempStateDir, packs/, anything else). ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); } { Stopwatch FinalizeTimer; MigrateTempToState(TempStateDir, ServerStateDir, m_Threading); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); Stats.FinalizeUs = FinalizeTimer.GetElapsedTimeUs(); } // Build the cached state that the next Dehydrate will receive (mirrors Load state on dehydrate). CbObject StateObject; { Stopwatch BuildStateTimer; StateObject = BuildHydrateState(ServerStateDir, EntryLookup, Entries, m_Config.ModuleId, m_Threading); Stats.BuildStateUs = BuildStateTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogHydrateSummary("Hydration complete"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return StateObject; } catch (const std::exception& Ex) { ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogHydrateSummary("Hydration failed"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return {}; } } void IncrementalHydrator::Obliterate() { ZEN_TRACE_CPU("IncrementalHydrator::Obliterate"); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); auto TryDeleteBackend = [&]() { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); m_Storage->Delete(Work, *m_Threading.WorkerPool); Work.Wait(); }; try { TryDeleteBackend(); } catch (const std::exception& Ex) { ZEN_WARN("Obliterate backend delete failed for module '{}' (attempt 1/2): {}. Retrying once.", m_Config.ModuleId, Ex.what()); try { TryDeleteBackend(); } catch (const std::exception& Ex2) { ZEN_WARN( "Obliterate backend delete failed for module '{}' (attempt 2/2): {}. Proceeding with local cleanup; backend data may " "remain.", m_Config.ModuleId, Ex2.what()); } } CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } } // namespace hydration_impl /////////////////////////////////////////////////////////////////////////// // HydrationBase subclasses - own hub-wide backend state, hand per-module // storages the exact inputs they need in CreateHydrator. class FileHydration : public HydrationBase { public: explicit FileHydration(const Configuration& Config); virtual std::unique_ptr CreateHydrator(const HydrationConfig& Config) override; private: std::filesystem::path m_StorageRoot; }; class S3Hydration : public HydrationBase { public: explicit S3Hydration(const Configuration& Config); virtual std::unique_ptr CreateHydrator(const HydrationConfig& Config) override; private: std::string m_Bucket; std::string m_Region; std::string m_Endpoint; bool m_PathStyle = false; std::string m_KeyPrefixRoot; SigV4Credentials m_Credentials; Ref m_CredentialProvider; std::unique_ptr m_Client; uint64_t m_DefaultMultipartChunkSize; }; HydrationBase::HydrationBase(const Configuration& Config) { using namespace hydration_impl; CbFieldView ExcludesField = Config.Options["excludes"sv]; m_Excludes = ExcludesField.HasValue() ? ParseStringArray(ExcludesField) : DefaultExcludes(); } /////////////////////////////////////////////////////////////////////////// // Implementations FileHydration::FileHydration(const Configuration& Config) : HydrationBase(Config) { if (!Config.TargetSpecification.empty()) { m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); if (m_StorageRoot.empty()) { throw zen::runtime_error("Hydration config 'file' type requires a directory path"sv); } } else { CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); std::string_view Path = Settings["path"sv].AsString(); if (Path.empty()) { throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"sv); } m_StorageRoot = Utf8ToWide(std::string(Path)); } MakeSafeAbsolutePathInPlace(m_StorageRoot); } std::unique_ptr FileHydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; return std::make_unique(Config, std::make_unique(m_StorageRoot / Config.ModuleId), m_Excludes); } S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config) { using namespace hydration_impl; CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); std::string_view Spec; if (!Config.TargetSpecification.empty()) { Spec = Config.TargetSpecification; Spec.remove_prefix(S3Storage::Prefix.size()); } else { std::string_view Uri = Settings["uri"sv].AsString(); if (Uri.empty()) { throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"sv); } Spec = Uri; Spec.remove_prefix(S3Storage::Prefix.size()); } size_t SlashPos = Spec.find('/'); m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; if (m_Bucket.empty()) { throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"sv); } std::string Region = std::string(Settings["region"sv].AsString()); if (Region.empty()) { Region = GetEnvVariable("AWS_DEFAULT_REGION").value_or(""); } if (Region.empty()) { Region = GetEnvVariable("AWS_REGION").value_or(""); } if (Region.empty()) { Region = "us-east-1"; } m_Region = std::move(Region); std::string_view Endpoint = Settings["endpoint"sv].AsString(); if (!Endpoint.empty()) { m_Endpoint = std::string(Endpoint); m_PathStyle = Settings["path-style"sv].AsBool(); } std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID").value_or(""); if (AccessKeyId.empty()) { m_CredentialProvider = Ref(new ImdsCredentialProvider({})); } else { m_Credentials.AccessKeyId = std::move(AccessKeyId); m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY").value_or(""); m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN").value_or(""); } m_DefaultMultipartChunkSize = Settings["chunksize"sv].AsUInt64(DefaultMultipartChunkSize); S3ClientOptions ClientOptions; ClientOptions.BucketName = m_Bucket; ClientOptions.Region = m_Region; ClientOptions.Endpoint = m_Endpoint; ClientOptions.PathStyle = m_PathStyle; if (m_CredentialProvider) { ClientOptions.CredentialProvider = m_CredentialProvider; } else { ClientOptions.Credentials = m_Credentials; } ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; // Retry transient HTTP failures (429 throttle, 503 SlowDown, 5xx, connection errors) at the // HTTP layer. CurlHttpClient::DoWithRetry uses 100*(Attempt+1) ms linear backoff between // attempts. Three retries covers brief S3 rate-limit bursts without holding worker threads // for long under sustained throttle. ClientOptions.HttpSettings.RetryCount = 3; m_Client = std::make_unique(ClientOptions); } std::unique_ptr S3Hydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId); return std::make_unique( Config, std::make_unique(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize), m_Excludes); } std::unique_ptr InitHydration(const HydrationBase::Configuration& Config) { using namespace hydration_impl; if (!Config.TargetSpecification.empty()) { if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0) { return std::make_unique(Config); } if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0) { return std::make_unique(Config); } throw zen::runtime_error("Unknown hydration strategy: {}"sv, Config.TargetSpecification); } std::string_view Type = Config.Options["type"sv].AsString(); if (Type == FileStorage::Type) { return std::make_unique(Config); } if (Type == S3Storage::Type) { return std::make_unique(Config); } if (!Type.empty()) { throw zen::runtime_error("Unknown hydration target type '{}'"sv, Type); } throw zen::runtime_error("No hydration target configured"sv); } #if ZEN_WITH_TESTS namespace { struct TestThreading { WorkerThreadPool WorkerPool; std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} }; /// Create a small file hierarchy under BaseDir: /// file_a.bin /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. typedef std::vector> TestFileList; TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); return Files; } TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); return Files; } TestFileList CreateTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } void VerifyTree(const std::filesystem::path& Dir, const std::vector>& Expected) { for (const auto& [RelPath, Content] : Expected) { std::filesystem::path FullPath = Dir / RelPath; REQUIRE_MESSAGE(std::filesystem::exists(FullPath), FullPath.string()); BasicFile ReadBack(FullPath, BasicFile::Mode::kRead); IoBuffer ReadContent = ReadBack.ReadRange(0, ReadBack.FileSize()); REQUIRE_EQ(ReadContent.GetSize(), Content.GetSize()); CHECK(std::memcmp(ReadContent.GetData(), Content.GetData(), Content.GetSize()) == 0); } } // Test fixture that centralizes the common scaffolding for file-backed hydration tests: // a scratch temp dir containing the server state, the hydration store, and the hydration // temp dir, plus an initialized FileHydration backend. struct FileHarness { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; std::unique_ptr Hydration; FileHarness() { CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); } HydrationConfig MakeConfig(std::string_view ModuleId, HydrationConfig Overrides = {}) const { Overrides.ServerStateDir = ServerStateDir; Overrides.TempDir = HydrationTemp; Overrides.ModuleId = std::string(ModuleId); return Overrides; } }; } // namespace TEST_SUITE_BEGIN("server.hydration"); // --------------------------------------------------------------------------- // FileHydrator tests // --------------------------------------------------------------------------- TEST_CASE("hydration.file.dehydrate_hydrate") { FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); constexpr auto ModuleId = "testmodule"; const auto Config = H.MakeConfig(ModuleId); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(H.ServerStateDir)); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); const auto Config = H.MakeConfig("testmodule"); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Stale file must be wiped by the rehydrate. WriteFile(H.ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); H.Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "stale.bin")); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.excluded_files_not_dehydrated") { FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); // Files matched by the built-in DefaultExcludes() set in hydration.cpp. Each must be // skipped during dehydrate and not be recreated by hydrate. CreateDirectories(H.ServerStateDir / "gc"); WriteFile(H.ServerStateDir / "gc" / "reserve.gc", CreateSemiRandomBlob(64)); CreateDirectories(H.ServerStateDir / ".sentry-native"); WriteFile(H.ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); WriteFile(H.ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); WriteFile(H.ServerStateDir / "state_marker", CreateSemiRandomBlob(16)); WriteFile(H.ServerStateDir / ".lock", CreateSemiRandomBlob(8)); WriteFile(H.ServerStateDir / "snapshot.bak", CreateSemiRandomBlob(48)); CreateDirectories(H.ServerStateDir / "auth"); WriteFile(H.ServerStateDir / "auth" / "authstate", CreateSemiRandomBlob(96)); const auto Config = H.MakeConfig("testmodule_excl"); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(H.ServerStateDir, true); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "gc" / "reserve.gc")); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".sentry-native")); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "state_marker")); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".lock")); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "snapshot.bak")); CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "auth" / "authstate")); } TEST_CASE("hydration.options.excludes_override") { // Explicit `excludes` replaces the built-in default list outright; `.lock` is not // in the override list, so it must appear in the manifest. Use the Options-only // path (type + settings.path) so the same Options object also carries the override // `excludes` array. ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "state"; std::filesystem::path HydrationStore = TempDir.Path() / "store"; std::filesystem::path HydrationTemp = TempDir.Path() / "tmp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); WriteFile(ServerStateDir / "regular.bin", CreateSemiRandomBlob(64)); WriteFile(ServerStateDir / ".lock", CreateSemiRandomBlob(8)); CbObjectWriter Options; Options << "type"sv << "file"sv; Options.BeginObject("settings"sv); { Options << "path"sv << HydrationStore.generic_string(); } Options.EndObject(); Options.BeginArray("excludes"sv); { Options << "*.tmp"sv; } Options.EndArray(); HydrationBase::Configuration HydrCfg{.Options = Options.Save()}; std::unique_ptr Hydration = InitHydration(HydrCfg); HydrationConfig PerModuleCfg{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "excl_off"}; Hydration->CreateHydrator(PerModuleCfg)->Dehydrate(CbObject()); const std::filesystem::path StateFile = HydrationStore / "excl_off" / "current-state.cbo"; REQUIRE(std::filesystem::exists(StateFile)); FileContents Contents = ReadFile(StateFile); REQUIRE(Contents); IoBuffer Payload = Contents.Flatten(); CbValidateError Err; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); REQUIRE_EQ(Err, CbValidateError::None); bool HasLock = false; for (CbFieldView F : Meta["Files"sv]) { if (F.AsObjectView()["Path"sv].AsString() == ".lock") { HasLock = true; break; } } CHECK(HasLock); } // --------------------------------------------------------------------------- // FileHydrator obliterate test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.obliterate") { FileHarness H; constexpr std::string_view ModuleId = "obliterate_test"sv; CreateSmallTestTree(H.ServerStateDir); const auto Config = H.MakeConfig(ModuleId); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); // Put files back in ServerStateDir + TempDir to verify cleanup. CreateSmallTestTree(H.ServerStateDir); WriteFile(H.HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); H.Hydration->CreateHydrator(Config)->Obliterate(); CHECK_FALSE(std::filesystem::exists(H.HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(H.ServerStateDir)); CHECK(std::filesystem::is_empty(H.HydrationTemp)); } // --------------------------------------------------------------------------- // Pack tests - exercise small-file packing, unpacking, and fallback paths. // --------------------------------------------------------------------------- TEST_CASE("hydration.file.pack_roundtrip") { // CreateSmallTestTree produces 6 files all < 2 KB -> single pack with every file in it. FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); const auto Config = H.MakeConfig("pack_roundtrip"); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::is_empty(H.ServerStateDir)); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.pack_disabled_fallback") { // PackEnabled=false -> every file is a standalone CAS entry regardless of size. FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); const auto Config = H.MakeConfig("pack_disabled", HydrationConfig{.PackEnabled = false}); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.pack_one_unique_fallback") { // Only 1 unique small-file candidate -> no pack (min 2 entries); falls back to standalone. FileHarness H; std::vector> TestFiles; IoBuffer Small = CreateSemiRandomBlob(128); WriteFile(H.ServerStateDir / "tiny.bin", Small); TestFiles.emplace_back("tiny.bin", std::move(Small)); IoBuffer Big = CreateSemiRandomBlob(8192); WriteFile(H.ServerStateDir / "big.bin", Big); TestFiles.emplace_back("big.bin", std::move(Big)); const auto Config = H.MakeConfig("pack_one_unique"); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.pack_duplicate_hashes") { // 10 files share one hash + 1 distinct file -> pack has 2 unique entries; hydrate writes // all 11 destinations correctly. FileHarness H; IoBuffer Shared = CreateSemiRandomBlob(256); IoBuffer Other = CreateSemiRandomBlob(256); std::vector> TestFiles; for (int I = 0; I < 10; ++I) { std::filesystem::path Rel = fmt::format("dup_{:02d}.bin"sv, I); WriteFile(H.ServerStateDir / Rel, Shared); TestFiles.emplace_back(Rel, Shared); } WriteFile(H.ServerStateDir / "other.bin", Other); TestFiles.emplace_back("other.bin", Other); const auto Config = H.MakeConfig("pack_duplicates"); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.pack_large_dataset") { // Mix of many small files + a few large ones, with a modest MaxPackBytes // to force bin-packing into multiple packs. Verifies ordering, splitting, and the // interaction between packed and standalone uploads. FileHarness H; std::vector> TestFiles; constexpr int kSmallCount = 100; constexpr int kLargeCount = 3; // Varied small-file sizes (256-2048 B) avoid artificial uniformity in the bin-pack. FastRandom Rand{.Seed = 0xcafebabe}; for (int I = 0; I < kSmallCount; ++I) { uint64_t Size = 256 + (Rand.Next() % 1793); // [256, 2048] IoBuffer Blob = CreateSemiRandomBlob(Rand, Size); auto Rel = std::filesystem::path(fmt::format("small/group{}/file_{:04d}.bin"sv, I / 25, I)); CreateDirectories((H.ServerStateDir / Rel).parent_path()); WriteFile(H.ServerStateDir / Rel, Blob); TestFiles.emplace_back(std::move(Rel), std::move(Blob)); } for (int I = 0; I < kLargeCount; ++I) { IoBuffer Blob = CreateSemiRandomBlob(Rand, 32 * 1024 + I * 4096); auto Rel = std::filesystem::path(fmt::format("large/file_{:02d}.bulk"sv, I)); CreateDirectories((H.ServerStateDir / Rel).parent_path()); WriteFile(H.ServerStateDir / Rel, Blob); TestFiles.emplace_back(std::move(Rel), std::move(Blob)); } // Cap each pack at ~32 KB -> 100 small files (~115 KB raw) split across ~4 packs. const auto Config = H.MakeConfig("pack_large", HydrationConfig{.MaxPackBytes = 32 * 1024}); H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::is_empty(H.ServerStateDir)); H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } TEST_CASE("hydration.file.pack_hash_determinism") { // Two independent dehydrate runs over the same content must produce byte-identical state // files (and therefore identical pack hashes). This is what keeps ExistsLookup dedup // working across redeploys. FileHarness H; FastRandom Rand{.Seed = 0x12345678}; std::vector> Files; for (int I = 0; I < 40; ++I) { IoBuffer Blob = CreateSemiRandomBlob(Rand, 256 + (I % 7) * 200); auto Rel = std::filesystem::path(fmt::format("tree/leaf_{:02d}.dat"sv, I)); CreateDirectories((H.ServerStateDir / Rel).parent_path()); WriteFile(H.ServerStateDir / Rel, Blob); Files.emplace_back(std::move(Rel), std::move(Blob)); } const auto Config = H.MakeConfig("pack_determinism"); const std::filesystem::path StateFile = H.HydrationStore / "pack_determinism" / "current-state.cbo"; // Extract the ordered pack-hash list from state.cbo. Timestamp / duration fields vary // across runs so byte-identity is not achievable; the pack identities are. auto ReadPackHashes = [&]() -> std::vector { FileContents Contents = ReadFile(StateFile); REQUIRE(Contents); IoBuffer Payload = Contents.Flatten(); CbValidateError Err; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); REQUIRE_EQ(Err, CbValidateError::None); std::vector Hashes; for (CbFieldView F : Meta["Packs"sv]) { Hashes.push_back(F.AsObjectView()["Hash"sv].AsHash()); } return Hashes; }; H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); std::vector First = ReadPackHashes(); REQUIRE_FALSE(First.empty()); // Rehydrate so the tree is back on disk, then dehydrate again with a fresh hydrator. H.Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(H.ServerStateDir, Files); auto HydrationB = InitHydration({.TargetSpecification = "file://" + H.HydrationStore.string()}); HydrationB->CreateHydrator(Config)->Dehydrate(CbObject()); std::vector Second = ReadPackHashes(); REQUIRE_EQ(First.size(), Second.size()); for (size_t I = 0; I < First.size(); ++I) { CHECK_EQ(First[I], Second[I]); } } TEST_CASE("hydration.file.pack_backward_compat_read") { // Hand-craft a state.cbo without any Packs[] / PackHash fields (old format). Hydrate must // treat every file as standalone and roundtrip successfully. FileHarness H; const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); const auto Config = H.MakeConfig("pack_oldformat", HydrationConfig{.PackEnabled = false}); // Dehydrate with PackEnabled=false -> state.cbo has no Packs[] and no PackHash fields. H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::is_empty(H.ServerStateDir)); // Hydrate with PackEnabled=true -> the hydrator must still handle the old-format state. const auto NewConfig = H.MakeConfig("pack_oldformat"); H.Hydration->CreateHydrator(NewConfig)->Hydrate(); VerifyTree(H.ServerStateDir, TestFiles); } // --------------------------------------------------------------------------- // CreateParentDirectories helper test // --------------------------------------------------------------------------- TEST_CASE("hydration.createparentdirectories") { ScopedTemporaryDirectory TempDir; const std::filesystem::path Root = TempDir.Path(); // Edge: empty input. CHECK_EQ(hydration_impl::CreateParentDirectories({}), 0u); // Edge: bare filename has no parent_path() -> contributes nothing. std::vector Bare{"bare.bin"}; CHECK_EQ(hydration_impl::CreateParentDirectories(Bare), 0u); // Edge: single input. Triggers the Dirs.size() == 1 path that bypasses the prune loop. const std::filesystem::path SingleRoot = Root / "single"; std::vector Single{SingleRoot / "only" / "a.bin"}; CHECK_EQ(hydration_impl::CreateParentDirectories(Single), 1u); CHECK(std::filesystem::is_directory(SingleRoot / "only")); // Edge: pre-existing dirs must not raise; count still reflects leaf set. const std::filesystem::path PreRoot = Root / "preexisting"; CreateDirectories(PreRoot / "pre" / "made"); std::vector Pre{PreRoot / "pre" / "made" / "f.bin"}; CHECK_EQ(hydration_impl::CreateParentDirectories(Pre), 1u); CHECK(std::filesystem::is_directory(PreRoot / "pre" / "made")); // Generic: ancestor-chain pruning, parent dedup across files in same dir, disjoint // siblings (cannot prune each other), nested-vs-flat coexistence. Expected leaves: // deep/nest/leaf, deep/sibling, flat, lone. const std::filesystem::path MixRoot = Root / "mixed"; std::vector Mix{MixRoot / "deep" / "nest" / "leaf" / "x.bin", MixRoot / "deep" / "nest" / "leaf" / "y.bin", // shares parent with x MixRoot / "deep" / "nest" / "g.bin", // ancestor of leaf -> pruned MixRoot / "deep" / "h.bin", // ancestor of nest -> pruned MixRoot / "deep" / "sibling" / "i.bin", // sibling of nest -> kept MixRoot / "flat" / "j.bin", // top-level sibling -> kept MixRoot / "lone" / "k.bin"}; // disjoint -> kept CHECK_EQ(hydration_impl::CreateParentDirectories(Mix), 4u); CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest" / "leaf")); CHECK(std::filesystem::is_directory(MixRoot / "deep" / "sibling")); CHECK(std::filesystem::is_directory(MixRoot / "flat")); CHECK(std::filesystem::is_directory(MixRoot / "lone")); // Pruned ancestors still exist via CreateDirectories recursion. CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest")); CHECK(std::filesystem::is_directory(MixRoot / "deep")); } // --------------------------------------------------------------------------- // FileHydrator concurrent test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.concurrent") { // N modules dehydrate and hydrate concurrently via ParallelWork. // Each module operates in its own directory - tests for global/static state races. constexpr int kModuleCount = 4; ScopedTemporaryDirectory TempDir; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); TestThreading Threading(8); struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } // --------------------------------------------------------------------------- // S3Hydrator tests // // Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"}; // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir // with a stale file to confirm the cleanup branch wipes it. WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); Hydration->CreateHydrator(Config)->Hydrate(); CHECK(std::filesystem::is_empty(ServerStateDir)); // v1: dehydrate without a marker file CreateSmallTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // v2: dehydrate WITH a marker file that only v2 has CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. // Each module has a distinct ModuleId, so S3 key prefixes don't overlap. MinioProcessOptions MinioOpts; MinioOpts.Port = 19013; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); constexpr int kModuleCount = 6; constexpr int kThreadCount = 4; TestThreading Threading(kThreadCount); ScopedTemporaryDirectory TempDir; struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { CleanDirectory(Config.ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } TEST_CASE("hydration.s3.obliterate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19019; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); constexpr std::string_view ModuleId = "s3test_obliterate"sv; HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)}; // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); auto ListModuleObjects = [&]() { S3ClientOptions Opts; Opts.BucketName = "zen-hydration-test"; Opts.Endpoint = Minio.Endpoint(); Opts.PathStyle = true; Opts.Credentials.AccessKeyId = Minio.RootUser(); Opts.Credentials.SecretAccessKey = Minio.RootPassword(); S3Client Client(Opts); return Client.ListObjects(fmt::format("{}/"sv, ModuleId)); }; // Verify objects exist in S3 CHECK(!ListModuleObjects().Objects.empty()); // Re-populate ServerStateDir and TempDir for cleanup verification CreateSmallTestTree(ServerStateDir); WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate Hydration->CreateHydrator(Config)->Obliterate(); // Verify S3 objects deleted CHECK(ListModuleObjects().Objects.empty()); // Local directories cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); CHECK(std::filesystem::is_empty(HydrationTemp)); } TEST_CASE("hydration.s3.config_overrides") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); // Path prefix: "s3://bucket/some/prefix" stores objects under // "some/prefix//..." rather than directly under "/...". { auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. { CleanDirectory(ServerStateDir, true); auto TestFiles = CreateSmallTestTree(ServerStateDir); ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } } TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) { MinioProcessOptions MinioOpts; MinioOpts.Port = 19010; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); constexpr std::string_view ModuleId = "s3test_performance"sv; CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Dehydrate: upload server state to MinIO ZEN_INFO("============== DEHYDRATE =============="); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); for (size_t I = 0; I < 1; I++) { // Wipe server state CleanDirectory(ServerStateDir, true); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: download from MinIO back to server state ZEN_INFO("=============== HYDRATE ==============="); Hydration->CreateHydrator(Config)->Hydrate(); } } //#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" //#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" TEST_CASE("hydration.file.incremental") { std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); constexpr std::string_view ModuleId = "testmodule"sv; // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from file store HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } // --------------------------------------------------------------------------- // S3Storage test // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.incremental") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); constexpr std::string_view ModuleId = "s3test_incremental"sv; TestThreading Threading(8); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from S3 HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } TEST_CASE("hydration.create_hydrator_rejects_invalid_config") { // Unknown TargetSpecification prefix CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"})); // Unknown Options type { std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()})); } // Empty Options (no type field) CHECK_THROWS(InitHydration({})); } TEST_SUITE_END(); void hydration_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen