diff options
| author | Stefan Boberg <[email protected]> | 2026-04-23 18:16:57 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-04-23 18:16:57 +0200 |
| commit | 0232b991cd7d8e3a2114ea30e4591dd3e7b65c36 (patch) | |
| tree | 94730e7594fd09ae1fa820391ce311f6daf13905 /src/zenserver/hub/hydration.cpp | |
| parent | Fix forward declaration order for s_GotSigWinch and SigWinchHandler (diff) | |
| parent | trace: declare Region event name fields as AnsiString (#1012) (diff) | |
| download | archived-zen-sb/zen-help.tar.xz archived-zen-sb/zen-help.zip | |
Merge branch 'main' into sb/zen-helpsb/zen-help
- Combine HelpCommand (this branch) with HistoryCommand (main) in zen CLI dispatcher
- Keep filter-aware TuiPickOne rewrite; adopt main's ASCII arrow glyphs in doc comment
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 2420 |
1 files changed, 1634 insertions, 786 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index ed16bfe56..c7f25bab6 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -5,24 +5,28 @@ #include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compress.h> #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> +#include <zencore/stream.h> #include <zencore/system.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> +#include <zenutil/filesystemutils.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <json11.hpp> -ZEN_THIRD_PARTY_INCLUDES_END +#include <numeric> +#include <unordered_map> +#include <unordered_set> #if ZEN_WITH_TESTS -# include <zencore/parallelwork.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/thread.h> # include <zencore/workthreadpool.h> # include <zenutil/cloud/minioprocess.h> # include <cstring> @@ -30,7 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -namespace { +namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime @@ -56,597 +60,1343 @@ namespace { } }; -} // namespace + std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) + { + auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); + std::filesystem::path RelativePath; + for (auto I = ItAbs; I != Abs.end(); I++) + { + RelativePath = RelativePath / *I; + } + return RelativePath; + } -/////////////////////////////////////////////////////////////////////////// + void CleanDirectory(WorkerThreadPool& WorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Path) + { + CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); + } -constexpr std::string_view FileHydratorPrefix = "file://"; -constexpr std::string_view FileHydratorType = "file"; + /////////////////////////////////////////////////////////////////////// + // Hydration / dehydration statistics. Atomics so they are safe to update + // from parallel worker lambdas. Summary is emitted once after the operation + // completes (success or failure). -struct FileHydrator : public HydrationStrategyBase -{ - virtual void Configure(const HydrationConfig& Config) override; - virtual void Hydrate() override; - virtual void Dehydrate() override; + struct PhaseStats + { + std::atomic<uint64_t> Files{0}; // host-side: count of work scheduled in this phase + std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion + std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait() -private: - HydrationConfig m_Config; - std::filesystem::path m_StorageModuleRootDir; -}; + RwLock ThreadIdsLock; + std::unordered_set<int> ThreadIds; -void -FileHydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; + void RecordThread() + { + int Tid = zen::GetCurrentThreadId(); + ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); }); + } + }; - std::filesystem::path ConfigPath; - if (!m_Config.TargetSpecification.empty()) + struct DehydrateStatistics { - ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length())); - } - else + PhaseStats Hash; + PhaseStats Upload; + PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs + + std::atomic<uint64_t> LoadStateUs{0}; + std::atomic<uint64_t> DirScanUs{0}; + std::atomic<uint64_t> ListExistingUs{0}; + std::atomic<uint64_t> MetadataSaveUs{0}; + std::atomic<uint64_t> CleanUs{0}; + + std::atomic<uint64_t> TotalFiles{0}; + std::atomic<uint64_t> TotalBytes{0}; + std::atomic<uint64_t> TotalUs{0}; + }; + + struct HydrateStatistics { - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Path = Settings["path"].AsString(); - if (Path.empty()) - { - throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); - } - ConfigPath = Utf8ToWide(std::string(Path)); - } - MakeSafeAbsolutePathInPlace(ConfigPath); + PhaseStats Download; - if (!std::filesystem::exists(ConfigPath)) + std::atomic<uint64_t> LoadMetadataUs{0}; + std::atomic<uint64_t> CleanUs{0}; + std::atomic<uint64_t> RenameOrCopyUs{0}; + std::atomic<uint64_t> VerifyScanUs{0}; + + std::atomic<uint64_t> TotalFiles{0}; + std::atomic<uint64_t> TotalBytes{0}; + std::atomic<uint64_t> TotalUs{0}; + }; + + // Bits-per-second rate computed at microsecond precision. Zero-safe. + inline uint64_t BitsPerSecond(uint64_t Bytes, uint64_t ElapsedUs) { - throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); + if (ElapsedUs == 0) + { + return 0; + } + return Bytes * 8 * 1'000'000ull / ElapsedUs; } - m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; + /////////////////////////////////////////////////////////////////////// + // Per-module storage interface driven by IncrementalHydrator. + + class StorageBase + { + public: + virtual ~StorageBase() = default; + + virtual std::string Describe() const = 0; + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + virtual CbObject GetSettings() = 0; + virtual void ParseSettings(const CbObjectView& Settings) = 0; + virtual std::vector<IoHash> List() = 0; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) = 0; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) = 0; + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) = 0; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; + }; - CreateDirectories(m_StorageModuleRootDir); -} + class FileStorage : public StorageBase + { + public: + static constexpr std::string_view Prefix = "file://"; + static constexpr std::string_view Type = "file"; + + explicit FileStorage(std::filesystem::path ModulePath); + + virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); } + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override { return {}; } + virtual void ParseSettings(const CbObjectView&) override {} + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) override; + virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&, PhaseStats&) override {} + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; + + private: + std::filesystem::path m_StoragePath; + std::filesystem::path m_StatePathName; + std::filesystem::path m_CASPath; + }; -void -FileHydrator::Hydrate() -{ - ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + class S3Storage : public StorageBase + { + public: + static constexpr std::string_view Prefix = "s3://"; + static constexpr std::string_view Type = "s3"; + static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; + + S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); + + virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); } + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override; + virtual void ParseSettings(const CbObjectView& Settings) override; + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) override; + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; + + private: + S3Client& m_Client; + std::string m_KeyPrefix; + std::filesystem::path m_TempDir; + uint64_t m_MultipartChunkSize; + }; - Stopwatch Timer; + /////////////////////////////////////////////////////////////////////// + // FileStorage implementations - // Ensure target is clean - ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath)) + { + MakeSafeAbsolutePathInPlace(m_StoragePath); + m_StatePathName = m_StoragePath / "current-state.cbo"; + m_CASPath = m_StoragePath / "cas"; + CreateDirectories(m_CASPath); + } - bool WipeServerState = false; + void FileStorage::SaveMetadata(const CbObject& Data) + { + BinaryWriter Output; + SaveCompactBinary(Output, Data); + WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); + } - try + CbObject FileStorage::LoadMetadata() { - ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); - CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); + if (!IsFile(m_StatePathName)) + { + return {}; + } + FileContents Content = ReadFile(m_StatePathName); + if (Content.ErrorCode) + { + ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); + } + IoBuffer Payload = Content.Flatten(); + CbValidateError Error; + CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); + if (Error != CbValidateError::None) + { + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + } + return Result; } - catch (std::exception& Ex) + + std::vector<IoHash> FileStorage::List() { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); + DirectoryContent DirContent; + GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); + std::vector<IoHash> Result; + Result.reserve(DirContent.Files.size()); + for (const std::filesystem::path& Path : DirContent.Files) + { + IoHash Hash; + if (IoHash::TryParse(Path.filename().string(), Hash)) + { + Result.push_back(Hash); + } + } + return Result; + } + + void FileStorage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) + { + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (!AbortFlag.load()) + { + std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash); + if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) + { + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath)); + } + Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); + } + }); + } - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; + void FileStorage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) + { + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats]( + std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (!AbortFlag.load()) + { + std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash); + if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) + { + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath)); + } + Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); + } + }); } - if (WipeServerState) + void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + ZEN_UNUSED(Work); + ZEN_UNUSED(WorkerPool); + DeleteDirectories(m_StoragePath); } - else + + /////////////////////////////////////////////////////////////////////// + // S3Storage implementations + + S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize) + : m_Client(Client) + , m_KeyPrefix(std::move(KeyPrefix)) + , m_TempDir(std::move(TempDir)) + , m_MultipartChunkSize(MultipartChunkSize) { - ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } -} -void -FileHydrator::Dehydrate() -{ - ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + void S3Storage::SaveMetadata(const CbObject& Data) + { + BinaryWriter Output; + SaveCompactBinary(Output, Data); + IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); - Stopwatch Timer; + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3Result Result = m_Client.PutObject(Key, std::move(Payload)); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + } + } - const std::filesystem::path TargetDir = m_StorageModuleRootDir; + CbObject S3Storage::LoadMetadata() + { + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3GetObjectResult Result = m_Client.GetObject(Key); + if (!Result.IsSuccess()) + { + if (Result.Error == S3GetObjectResult::NotFoundErrorText) + { + return {}; + } + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); + } - // Ensure target is clean. This could be replaced with an atomic copy at a later date - // (i.e copy into a temporary directory name and rename it once complete) + CbValidateError Error; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); + if (Error != CbValidateError::None) + { + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); + } + return Meta; + } - ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + CbObject S3Storage::GetSettings() + { + CbObjectWriter Writer; + Writer << "MultipartChunkSize" << m_MultipartChunkSize; + return Writer.Save(); + } - bool CopySuccess = true; + void S3Storage::ParseSettings(const CbObjectView& Settings) + { + m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + } - try + std::vector<IoHash> S3Storage::List() { - ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir)) + std::string CasPrefix = m_KeyPrefix + "/cas/"; + S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); + } + + std::vector<IoHash> Hashes; + Hashes.reserve(Result.Objects.size()); + for (const S3ObjectInfo& Obj : Result.Objects) { - if (Entry.path().filename() == ".sentry-native") + size_t LastSlash = Obj.Key.rfind('/'); + if (LastSlash == std::string::npos) { continue; } - std::filesystem::path Dest = TargetDir / Entry.path().filename(); - if (Entry.is_directory()) + IoHash Hash; + if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) { - CreateDirectories(Dest); - CopyTree(Entry.path(), Dest, {.EnableClone = true}); - } - else - { - CopyFile(Entry.path(), Dest, {.EnableClone = true}); + Hashes.push_back(Hash); } } + return Hashes; } - catch (std::exception& Ex) - { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); - // We don't do the clean right here to avoid potentially running into double-throws - CopySuccess = false; - } + void S3Storage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) + { + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (AbortFlag.load()) + { + return; + } + S3Client& Client = m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - if (!CopySuccess) - { - ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObjectMultipart( + Key, + Size, + [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, + m_MultipartChunkSize); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + else + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObject(Key, File.ReadAll()); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); + }); } - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - - if (CopySuccess) + void S3Storage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) { - ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } -} + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); -/////////////////////////////////////////////////////////////////////////// + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + class WorkData + { + public: + WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) + { + PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); + } + ~WorkData() { m_DestFile.Flush(); } + void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } -constexpr std::string_view S3HydratorPrefix = "s3://"; -constexpr std::string_view S3HydratorType = "s3"; + private: + BasicFile m_DestFile; + }; -struct S3Hydrator : public HydrationStrategyBase -{ - void Configure(const HydrationConfig& Config) override; - void Dehydrate() override; - void Hydrate() override; + std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size); -private: - S3Client CreateS3Client() const; - std::string BuildTimestampFolderName() const; - std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; + uint64_t Offset = 0; + while (Offset < Size) + { + uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); - HydrationConfig m_Config; - std::string m_Bucket; - std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash - std::string m_Region; - SigV4Credentials m_Credentials; - Ref<ImdsCredentialProvider> m_CredentialProvider; + Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); + } - static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024; -}; + Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); + Stats.Bytes.fetch_add(ChunkSize, std::memory_order_relaxed); + }); + Offset += ChunkSize; + } + } + else + { + Work.ScheduleWork( + WorkerPool, + [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + } -void -S3Hydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; + if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + { + std::error_code Ec; + std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); + if (Ec) + { + WriteFile(DestinationPath, Chunk.Content); + } + else + { + Chunk.Content.SetDeleteOnClose(false); + Chunk.Content = {}; + RenameFile(ChunkPath, DestinationPath, Ec); + if (Ec) + { + Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); + Chunk.Content.SetDeleteOnClose(true); + WriteFile(DestinationPath, Chunk.Content); + } + } + } + else + { + WriteFile(DestinationPath, Chunk.Content); + } + Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); + }); + } + } - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Spec; - if (!m_Config.TargetSpecification.empty()) + void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) { - Spec = m_Config.TargetSpecification; - Spec.remove_prefix(S3HydratorPrefix.size()); + Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) { + Stats.RecordThread(); + if (AbortFlag.load()) + { + return; + } + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + S3Result Result = m_Client.Touch(Key); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); + } + }); } - else + + void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { - std::string_view Uri = Settings["uri"].AsString(); - if (Uri.empty()) + std::string ModulePrefix = m_KeyPrefix + "/"; + S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); + if (!ListResult.IsSuccess()) { - throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'"); + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); + } + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Result DelResult = m_Client.DeleteObject(Key); + if (!DelResult.IsSuccess()) + { + throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + } + }); } - Spec = Uri; - Spec.remove_prefix(S3HydratorPrefix.size()); } - size_t SlashPos = Spec.find('/'); - std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; - m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); - m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; - - ZEN_ASSERT(!m_Bucket.empty()); - - std::string Region = std::string(Settings["region"].AsString()); - if (Region.empty()) - { - Region = GetEnvVariable("AWS_DEFAULT_REGION"); + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator: the only HydrationStrategyBase implementation. + // Summary emission for hydrate/dehydrate operations. + + void LogDehydrateSummary(std::string_view Prefix, + const DehydrateStatistics& Stats, + std::string_view ModuleId, + const std::filesystem::path& Source, + std::string_view Target) + { + const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); + const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); + ZEN_INFO( + "{} module '{}': {} files ({}) in {}\n" + " Source: {}\n" + " Target: {}\n" + " Load state: {}\n" + " Dir scan: {}\n" + " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n" + " List existing: {}\n" + " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n" + " Metadata save: {}\n" + " Clean: {}", + Prefix, + ModuleId, + ThousandsNum(Stats.TotalFiles.load()), + NiceBytes(Stats.TotalBytes.load()), + NiceLatencyNs(Stats.TotalUs.load() * 1000), + Source.generic_string(), + Target, + NiceLatencyNs(Stats.LoadStateUs.load() * 1000), + NiceLatencyNs(Stats.DirScanUs.load() * 1000), + NiceLatencyNs(HashUs * 1000), + ThousandsNum(Stats.Hash.Files.load()), + ThousandsNum(Stats.TotalFiles.load()), + NiceBytes(Stats.Hash.Bytes.load()), + NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)), + Stats.Hash.ThreadIds.size(), + NiceLatencyNs(Stats.ListExistingUs.load() * 1000), + NiceLatencyNs(UploadUs * 1000), + ThousandsNum(Stats.Upload.Files.load()), + ThousandsNum(Stats.TotalFiles.load()), + NiceBytes(Stats.Upload.Bytes.load()), + ThousandsNum(Stats.Touch.Files.load()), + NiceBytes(Stats.Touch.Bytes.load()), + NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)), + Stats.Upload.ThreadIds.size(), + NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000), + NiceLatencyNs(Stats.CleanUs.load() * 1000)); } - if (Region.empty()) - { - Region = GetEnvVariable("AWS_REGION"); - } - if (Region.empty()) - { - Region = "us-east-1"; - } - m_Region = std::move(Region); - std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); - if (AccessKeyId.empty()) - { - m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + void LogHydrateSummary(std::string_view Prefix, + const HydrateStatistics& Stats, + std::string_view ModuleId, + std::string_view Source, + const std::filesystem::path& Target) + { + const uint64_t DownloadUs = Stats.Download.ElapsedUs.load(); + ZEN_INFO( + "{} module '{}': {} files ({}) in {}\n" + " Source: {}\n" + " Target: {}\n" + " Load metadata: {}\n" + " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n" + " Clean: {}\n" + " Rename/copy: {}\n" + " Verify scan: {}", + Prefix, + ModuleId, + ThousandsNum(Stats.TotalFiles.load()), + NiceBytes(Stats.TotalBytes.load()), + NiceLatencyNs(Stats.TotalUs.load() * 1000), + Source, + Target.generic_string(), + NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000), + NiceLatencyNs(DownloadUs * 1000), + ThousandsNum(Stats.Download.Files.load()), + ThousandsNum(Stats.TotalFiles.load()), + NiceBytes(Stats.Download.Bytes.load()), + NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)), + Stats.Download.ThreadIds.size(), + NiceLatencyNs(Stats.CleanUs.load() * 1000), + NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000), + NiceLatencyNs(Stats.VerifyScanUs.load() * 1000)); } - else - { - m_Credentials.AccessKeyId = std::move(AccessKeyId); - m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); - m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); - } -} -S3Client -S3Hydrator::CreateS3Client() const -{ - S3ClientOptions Options; - Options.BucketName = m_Bucket; - Options.Region = m_Region; + /////////////////////////////////////////////////////////////////////// + // Holds a per-module StorageBase and threading context; drives the + // hydrate/dehydrate algorithm. - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Endpoint = Settings["endpoint"].AsString(); - if (!Endpoint.empty()) + class IncrementalHydrator : public HydrationStrategyBase { - Options.Endpoint = std::string(Endpoint); - Options.PathStyle = Settings["path-style"].AsBool(); - } + public: + IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage); + virtual ~IncrementalHydrator() override; - if (m_CredentialProvider) - { - Options.CredentialProvider = m_CredentialProvider; - } - else - { - Options.Credentials = m_Credentials; - } + virtual void Dehydrate(const CbObject& CachedState) override; + virtual CbObject Hydrate() override; + virtual void Obliterate() override; - Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + private: + struct Entry + { + std::filesystem::path RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + }; - return S3Client(Options); -} + std::unique_ptr<StorageBase> m_Storage; + HydrationConfig m_Config; + WorkerThreadPool m_FallbackWorkPool; + std::atomic<bool> m_FallbackAbortFlag{false}; + std::atomic<bool> m_FallbackPauseFlag{false}; + HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, + .AbortFlag = &m_FallbackAbortFlag, + .PauseFlag = &m_FallbackPauseFlag}; + }; -std::string -S3Hydrator::BuildTimestampFolderName() const -{ - UtcTime Now = UtcTime::Now(); - return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); -} + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator implementations -std::string -S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const -{ - return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); -} + IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage) + : m_Storage(std::move(Storage)) + , m_Config(Config) + , m_FallbackWorkPool(0) + { + if (Config.Threading) + { + m_Threading = *Config.Threading; + } + } -void -S3Hydrator::Dehydrate() -{ - ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); + IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } - try + void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { - S3Client Client = CreateS3Client(); - std::string FolderName = BuildTimestampFolderName(); - uint64_t TotalBytes = 0; - uint32_t FileCount = 0; - Stopwatch Timer; + Stopwatch TotalTimer; + DehydrateStatistics Stats; + const std::string StorageTarget = m_Storage->Describe(); - DirectoryContent DirContent; - GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); - - for (const std::filesystem::path& AbsPath : DirContent.Files) + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + try { - std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir); - if (RelPath.empty() || *RelPath.begin() == "..") + std::unordered_map<std::string, size_t> StateEntryLookup; + std::vector<Entry> StateEntries; { - throw zen::runtime_error( - "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - " - "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)", - AbsPath.string(), - m_Config.ServerStateDir.string()); + Stopwatch LoadStateTimer; + for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) + { + CbObjectView EntryView = FieldView.AsObjectView(); + std::filesystem::path RelativePath(EntryView["Path"].AsString()); + uint64_t Size = EntryView["Size"].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"].AsUInt64(); + IoHash Hash = EntryView["Hash"].AsHash(); + + StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); + StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); + } + Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs(); } - if (*RelPath.begin() == ".sentry-native") + + DirectoryContent DirContent; { - continue; + Stopwatch DirScanTimer; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + Stats.DirScanUs = DirScanTimer.GetElapsedTimeUs(); } - std::string Key = MakeObjectKey(FolderName, RelPath); - BasicFile File(AbsPath, BasicFile::Mode::kRead); - uint64_t FileSize = File.FileSize(); + ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + DirContent.Files.size(), + NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); - S3Result UploadResult = Client.PutObjectMultipart( - Key, - FileSize, - [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }, - MultipartChunkSize); - if (!UploadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); - } + std::vector<Entry> Entries; + Entries.resize(DirContent.Files.size()); - TotalBytes += FileSize; - ++FileCount; - } + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; - // Write current-state.json - uint64_t UploadDurationMs = Timer.GetElapsedTimeMs(); - - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "FolderName" << FolderName; - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadDurationMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "FileCount" << FileCount; - - ExtendableStringBuilder<1024> JsonBuilder; - Meta.Save().ToJson(JsonBuilder); - - std::string MetaKey = m_KeyPrefix + "/current-state.json"; - std::string_view JsonText = JsonBuilder.ToView(); - IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); - S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); - if (!MetaUploadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); - } + std::unordered_set<IoHash> ExistsLookup; - ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs)); - } - catch (std::exception& Ex) - { - // Any in-progress multipart upload has already been aborted by PutObjectMultipart. - // current-state.json is only written on success, so the previous S3 state remains valid. - ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); - } -} + { + Stopwatch HashTimer; + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); -void -S3Hydrator::Hydrate() -{ - ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + if (AbsPath.filename() == "reserve.gc") + { + continue; + } + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + if (*RelativePath.begin() == ".sentry-native") + { + continue; + } + if (RelativePath == ".lock") + { + continue; + } + + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = RelativePath; + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; + + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + { + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; + } + } - Stopwatch Timer; - const bool ForceRemoveReadOnlyFiles = true; + if (!FoundHash) + { + Work.ScheduleWork(*m_Threading.WorkerPool, + [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) { + Stats.Hash.RecordThread(); + if (AbortFlag) + { + return; + } + + Entry& CurrentEntry = Entries[EntryIndex]; + + bool FoundHash = false; + if (AbsPath.extension().empty()) + { + auto It = CurrentEntry.RelativePath.begin(); + if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed( + SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), + RawHash, + RawSize); + if (Compressed) + { + // We compose a meta-hash since taking the RawHash might collide with an + // existing non-compressed file with the same content The collision is + // unlikely except if the compressed data is zero bytes causing RawHash + // to be the same as an empty file. + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); + CurrentEntry.Hash = Hasher.GetHash(); + FoundHash = true; + } + } + } + + if (!FoundHash) + { + CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); + } + Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + }); + Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed); + } + TotalFiles++; + TotalBytes += CurrentEntry.Size; + } - // Clean temp dir before starting in case of leftover state from a previous failed hydration - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + { + Stopwatch ListTimer; + std::vector<IoHash> ExistingEntries = m_Storage->List(); + ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); + Stats.ListExistingUs = ListTimer.GetElapsedTimeUs(); + } - bool WipeServerState = false; + Work.Wait(); - try - { - S3Client Client = CreateS3Client(); - std::string MetaKey = m_KeyPrefix + "/current-state.json"; + Entries.resize(TotalFiles); + Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs(); + Stats.TotalFiles = TotalFiles; + Stats.TotalBytes = TotalBytes; + } - S3GetObjectResult MetaResult = Client.GetObject(MetaKey); - if (!MetaResult.IsSuccess()) - { - if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText) + uint64_t UploadDurationMs = 0; { - ZEN_INFO("No state found in S3 at {}", MetaKey); + Stopwatch UploadTimer; + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - return; + for (const Entry& CurrentEntry : Entries) + { + if (!ExistsLookup.contains(CurrentEntry.Hash)) + { + m_Storage->Put(Work, + *m_Threading.WorkerPool, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), + Stats.Upload); + Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed); + } + else + { + // Refresh the backend's modification time so lifecycle-expiration policies + // do not evict CAS entries that are still referenced by this module. + m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch); + Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed); + Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + } + } + + Work.Wait(); + Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); + UploadDurationMs = TotalTimer.GetElapsedTimeMs(); + + Stopwatch MetadataTimer; + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SourceFolder" << ServerStateDir.generic_string(); + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadDurationMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "StorageSettings" << m_Storage->GetSettings(); + + Meta.BeginArray("Files"); + for (const Entry& CurrentEntry : Entries) + { + Meta.BeginObject(); + { + Meta << "Path" << CurrentEntry.RelativePath.generic_string(); + Meta << "Size" << CurrentEntry.Size; + Meta << "ModTick" << CurrentEntry.ModTick; + Meta << "Hash" << CurrentEntry.Hash; + } + Meta.EndObject(); + } + Meta.EndArray(); + + m_Storage->SaveMetadata(Meta.Save()); + Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs(); } - throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); - } - std::string ParseError; - json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); - if (!ParseError.empty()) - { - throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); - } + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + { + Stopwatch CleanTimer; + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); + } - std::string FolderName = MetaJson["FolderName"].string_value(); - if (FolderName.empty()) - { - throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); + Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); + LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } - - std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; - S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); - if (!ListResult.IsSuccess()) + catch (const std::exception& Ex) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); + ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); + Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); + LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } + } - for (const S3ObjectInfo& Obj : ListResult.Objects) + CbObject IncrementalHydrator::Hydrate() + { + Stopwatch TotalTimer; + HydrateStatistics Stats; + const std::string StorageSource = m_Storage->Describe(); + + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + try { - if (!Obj.Key.starts_with(FolderPrefix)) + CbObject Meta; { - ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); - continue; + Stopwatch LoadTimer; + Meta = m_Storage->LoadMetadata(); + Stats.LoadMetadataUs = LoadTimer.GetElapsedTimeUs(); } + if (!Meta) + { + ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", + m_Config.ModuleId, + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + return CbObject(); + } + + std::unordered_map<std::string, size_t> EntryLookup; + std::vector<Entry> Entries; + uint64_t TotalSize = 0; - std::string RelKey = Obj.Key.substr(FolderPrefix.size()); - if (RelKey.empty()) + for (CbFieldView FieldView : Meta["Files"]) { - continue; + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) + { + Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), + .Size = EntryView["Size"].AsUInt64(), + .ModTick = EntryView["ModTick"].AsUInt64(), + .Hash = EntryView["Hash"].AsHash()}; + TotalSize += NewEntry.Size; + EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); + Entries.emplace_back(std::move(NewEntry)); + } } - std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); - CreateDirectories(DestPath.parent_path()); - if (Obj.Size > MultipartChunkSize) + Stats.TotalFiles = Entries.size(); + Stats.TotalBytes = TotalSize; + + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + Entries.size(), + NiceBytes(TotalSize)); + + m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); + { - BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); - DestFile.SetFileSize(Obj.Size); + Stopwatch DownloadTimer; + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (const Entry& CurrentEntry : Entries) + { + std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); + CreateDirectories(Path.parent_path()); + m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download); + Stats.Download.Files.fetch_add(1, std::memory_order_relaxed); + } - BasicFileWriter Writer(DestFile, 64 * 1024); + Work.Wait(); + Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs(); + } + + // Downloaded successfully - swap into ServerStateDir + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + { + Stopwatch CleanTimer; + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); + } - uint64_t Offset = 0; - while (Offset < Obj.Size) + { + Stopwatch RenameTimer; + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItTmp != TempDir.begin()) { - uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset); - S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); - if (!Chunk.IsSuccess()) + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + TempDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); + + for (const std::filesystem::path& AbsPath : DirContent.Directories) { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", - Obj.Key, - Offset, - Offset + ChunkSize - 1, - Chunk.Error); + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + } + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + } } - Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); - Offset += ChunkSize; + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } - - Writer.Flush(); - } - else - { - S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir); - if (!Chunk.IsSuccess()) + else { - throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error); + // Slow path: TempDir and ServerStateDir are on different filesystems, so rename + // would fail. Copy the tree instead and clean up the temp files afterwards. + ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } + Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs(); + } - if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + CbObject StateObject; + { + Stopwatch VerifyTimer; + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - std::error_code Ec; - std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); - if (Ec) + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + + if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) { - WriteFile(DestPath, Chunk.Content); + HydrateState.BeginObject(); + { + HydrateState << "Path" << RelativePath.generic_string(); + HydrateState << "Size" << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash" << Entries[It->second].Hash; + } + HydrateState.EndObject(); } else { - Chunk.Content.SetDeleteOnClose(false); - Chunk.Content = {}; - RenameFile(ChunkPath, DestPath, Ec); + ZEN_ASSERT(false); } } - else - { - WriteFile(DestPath, Chunk.Content); - } + HydrateState.EndArray(); + + StateObject = HydrateState.Save(); + Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs(); } + + Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); + LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + + return StateObject; + } + catch (const std::exception& Ex) + { + ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); + LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + return {}; } + } - // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + void IncrementalHydrator::Obliterate() + { + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); - // If the two paths share at least one common component they are on the same drive/volume - // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = - std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); - if (ItTmp != m_Config.TempDir.begin()) + try { - DirectoryContent DirContent; - GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + m_Storage->Delete(Work, *m_Threading.WorkerPool); + Work.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + } - for (const std::filesystem::path& AbsPath : DirContent.Directories) - { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); - RenameDirectory(AbsPath, Dest); - } - for (const std::filesystem::path& AbsPath : DirContent.Files) - { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); - RenameFile(AbsPath, Dest); - } + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + } - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); +} // namespace hydration_impl + +/////////////////////////////////////////////////////////////////////////// +// HydrationBase subclasses - own hub-wide backend state, hand per-module +// storages the exact inputs they need in CreateHydrator. + +class FileHydration : public HydrationBase +{ +public: + explicit FileHydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::filesystem::path m_StorageRoot; +}; + +class S3Hydration : public HydrationBase +{ +public: + explicit S3Hydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::string m_Bucket; + std::string m_Region; + std::string m_Endpoint; + bool m_PathStyle = false; + std::string m_KeyPrefixRoot; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; + std::unique_ptr<S3Client> m_Client; + uint64_t m_DefaultMultipartChunkSize; +}; + +/////////////////////////////////////////////////////////////////////////// +// Implementations + +FileHydration::FileHydration(const Configuration& Config) +{ + if (!Config.TargetSpecification.empty()) + { + m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); + if (m_StorageRoot.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires a directory path"); } - else + } + else + { + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Path = Settings["path"].AsString(); + if (Path.empty()) { - // Slow path: TempDir and ServerStateDir are on different filesystems, so rename - // would fail. Copy the tree instead and clean up the temp files afterwards. - ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + } + m_StorageRoot = Utf8ToWide(std::string(Path)); + } + MakeSafeAbsolutePathInPlace(m_StorageRoot); +} + +std::unique_ptr<HydrationStrategyBase> +FileHydration::CreateHydrator(const HydrationConfig& Config) +{ + using namespace hydration_impl; + return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId)); +} + +S3Hydration::S3Hydration(const Configuration& Config) +{ + using namespace hydration_impl; + + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Spec; + if (!Config.TargetSpecification.empty()) + { + Spec = Config.TargetSpecification; + Spec.remove_prefix(S3Storage::Prefix.size()); + } + else + { + std::string_view Uri = Settings["uri"].AsString(); + if (Uri.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); } + Spec = Uri; + Spec.remove_prefix(S3Storage::Prefix.size()); + } + + size_t SlashPos = Spec.find('/'); + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; + + if (m_Bucket.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); + } - ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + std::string Region = std::string(Settings["region"].AsString()); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_DEFAULT_REGION"); + } + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; } - catch (std::exception& Ex) + m_Region = std::move(Region); + + std::string_view Endpoint = Settings["endpoint"].AsString(); + if (!Endpoint.empty()) { - ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); + m_Endpoint = std::string(Endpoint); + m_PathStyle = Settings["path-style"].AsBool(); + } - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } - if (WipeServerState) + m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + + S3ClientOptions ClientOptions; + ClientOptions.BucketName = m_Bucket; + ClientOptions.Region = m_Region; + ClientOptions.Endpoint = m_Endpoint; + ClientOptions.PathStyle = m_PathStyle; + if (m_CredentialProvider) { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + ClientOptions.CredentialProvider = m_CredentialProvider; } + else + { + ClientOptions.Credentials = m_Credentials; + } + ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + + m_Client = std::make_unique<S3Client>(ClientOptions); } std::unique_ptr<HydrationStrategyBase> -CreateHydrator(const HydrationConfig& Config) +S3Hydration::CreateHydrator(const HydrationConfig& Config) +{ + using namespace hydration_impl; + std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); + return std::make_unique<IncrementalHydrator>( + Config, + std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); +} + +std::unique_ptr<HydrationBase> +InitHydration(const HydrationBase::Configuration& Config) { + using namespace hydration_impl; + if (!Config.TargetSpecification.empty()) { - if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) + if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); - Hydrator->Configure(Config); - return Hydrator; + return std::make_unique<FileHydration>(Config); } - if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); - Hydrator->Configure(Config); - return Hydrator; + return std::make_unique<S3Hydration>(Config); } - throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); + throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); } std::string_view Type = Config.Options["type"].AsString(); - if (Type == FileHydratorType) + if (Type == FileStorage::Type) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); - Hydrator->Configure(Config); - return Hydrator; + return std::make_unique<FileHydration>(Config); } - if (Type == S3HydratorType) + if (Type == S3Storage::Type) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); - Hydrator->Configure(Config); - return Hydrator; + return std::make_unique<S3Hydration>(Config); } if (!Type.empty()) { @@ -659,60 +1409,14 @@ CreateHydrator(const HydrationConfig& Config) namespace { - /// Scoped RAII helper to set/restore a single environment variable within a test. - /// Used to configure AWS credentials for each S3 test's MinIO instance - /// without polluting the global environment. - struct ScopedEnvVar + struct TestThreading { - std::string m_Name; - std::optional<std::string> m_OldValue; // nullopt = was not set; "" = was set to empty string + WorkerThreadPool WorkerPool; + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; - ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name) - { -# if ZEN_PLATFORM_WINDOWS - // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND) - // from "set to empty string" (returns 0 with no error). - char Buf[1]; - DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf)); - if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND) - { - m_OldValue = std::nullopt; - } - else - { - // Len == 0 with no error: variable exists but is empty. - // Len > sizeof(Buf): value is non-empty; Len is the required buffer size - // (including null terminator) - allocate and re-read. - std::string Old(Len == 0 ? 0 : Len - 1, '\0'); - if (Len > sizeof(Buf)) - { - GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len); - } - m_OldValue = std::move(Old); - } - SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str()); -# else - // getenv returns nullptr when not set, "" when set to empty string. - const char* Existing = getenv(m_Name.c_str()); - m_OldValue = Existing ? std::optional<std::string>(Existing) : std::nullopt; - setenv(m_Name.c_str(), std::string(Value).c_str(), 1); -# endif - } - ~ScopedEnvVar() - { -# if ZEN_PLATFORM_WINDOWS - SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr); -# else - if (m_OldValue.has_value()) - { - setenv(m_Name.c_str(), m_OldValue->c_str(), 1); - } - else - { - unsetenv(m_Name.c_str()); - } -# endif - } + explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} }; /// Create a small file hierarchy under BaseDir: @@ -720,10 +1424,10 @@ namespace { /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. - std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir) - { - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + typedef std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFileList; + TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) + { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); @@ -737,9 +1441,33 @@ namespace { AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); + + return Files; + } + + TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + return Files; + } + + TestFileList CreateTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + + auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { + std::filesystem::path FullPath = BaseDir / RelPath; + CreateDirectories(FullPath.parent_path()); + WriteFile(FullPath, Content); + Files.emplace_back(std::move(RelPath), std::move(Content)); + }; + AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); + AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } @@ -777,35 +1505,27 @@ TEST_CASE("hydration.file.dehydrate_hydrate") CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "file://" + HydrationStore.string(); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate: copy server state to file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.dehydrate_cleans_server_state") +TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; @@ -816,22 +1536,25 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; - // FileHydrator::Dehydrate() must wipe ServerStateDir when done - CHECK(std::filesystem::is_empty(ServerStateDir)); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + // Put a stale file in ServerStateDir to simulate leftover state + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + + // Hydrate - must wipe stale file and restore original + Hydration->CreateHydrator(Config)->Hydrate(); + + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +TEST_CASE("hydration.file.excluded_files_not_dehydrated") { ScopedTemporaryDirectory TempDir; @@ -842,31 +1565,70 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + // Add files that the dehydrator should skip + WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); + CreateDirectories(ServerStateDir / ".sentry-native"); + WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); + WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); - // Dehydrate the original state - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - // Put a stale file in ServerStateDir to simulate leftover state - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; - // Hydrate - must wipe stale file and restore original - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + // Hydrate into a clean directory + CleanDirectory(ServerStateDir, true); + Hydration->CreateHydrator(Config)->Hydrate(); + + // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); + // Excluded files must NOT be restored + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); + CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); +} + +// --------------------------------------------------------------------------- +// FileHydrator obliterate test +// --------------------------------------------------------------------------- + +TEST_CASE("hydration.file.obliterate") +{ + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "obliterate_test"; + CreateSmallTestTree(ServerStateDir); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + + // Dehydrate so the backend store has data + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(HydrationStore / ModuleId)); + + // Put some files back in ServerStateDir and TempDir to verify cleanup + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + // Obliterate + Hydration->CreateHydrator(Config)->Obliterate(); + + // Backend store directory deleted + CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); + // ServerStateDir cleaned + CHECK(std::filesystem::is_empty(ServerStateDir)); + // TempDir cleaned + CHECK(std::filesystem::is_empty(HydrationTemp)); } // --------------------------------------------------------------------------- @@ -883,6 +1645,8 @@ TEST_CASE("hydration.file.concurrent") std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); + TestThreading Threading(8); + struct ModuleData { HydrationConfig Config; @@ -890,6 +1654,8 @@ TEST_CASE("hydration.file.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); @@ -898,11 +1664,11 @@ TEST_CASE("hydration.file.concurrent") CreateDirectories(StateDir); CreateDirectories(TempPath); - Modules[I].Config.ServerStateDir = StateDir; - Modules[I].Config.TempDir = TempPath; - Modules[I].Config.ModuleId = ModuleId; - Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate @@ -914,9 +1680,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -932,9 +1697,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -951,76 +1715,13 @@ TEST_CASE("hydration.file.concurrent") // --------------------------------------------------------------------------- // S3Hydrator tests // -// Each test case spawns its own local MinIO instance (self-contained, no external setup needed). +// Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19010; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationTemp); - - const std::string ModuleId = "s3test_roundtrip"; - auto TestFiles = CreateTestTree(ServerStateDir); - - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); - - // Dehydrate: upload server state to MinIO - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } - - // Wipe server state - CleanDirectory(ServerStateDir, true); - CHECK(std::filesystem::is_empty(ServerStateDir)); - - // Hydrate: download from MinIO back to server state - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } - - // Verify restored contents match the original - VerifyTree(ServerStateDir, TestFiles); -} - -TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") -{ - // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites - // current-state.json to point at that folder. Old folders are NOT deleted. - // Hydrate() must read current-state.json to determine which folder to restore from. - // - // This test verifies that: - // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first, - // confirming that current-state.json was updated between dehydrations. - // 2. current-state.json is updated to point at the second (latest) folder. - // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot. - - MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); @@ -1036,12 +1737,7 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_folder_select"; - - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1049,108 +1745,37 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - // v1: dehydrate without a marker file - CreateTestTree(ServerStateDir); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"}; - // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later - // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin). - Sleep(100); + // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir + // with a stale file to confirm the cleanup branch wipes it. + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + Hydration->CreateHydrator(Config)->Hydrate(); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // v1: dehydrate without a marker file + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // v2: dehydrate WITH a marker file that only v2 has - CreateTestTree(ServerStateDir); + CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - // Hydrate must restore v2 (current-state.json points to the v2 folder) + // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); - // v2 marker must be present - confirms current-state.json pointed to the v2 folder + // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); - // Subdirectory hierarchy must also be intact CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } -TEST_CASE("hydration.s3.module_isolation") -{ - // Two independent modules dehydrate/hydrate without interfering with each other. - // Uses VerifyTree with per-module byte content to detect cross-module data mixing. - MinioProcessOptions MinioOpts; - MinioOpts.Port = 19012; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - struct ModuleData - { - HydrationConfig Config; - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; - }; - - std::vector<ModuleData> Modules; - for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"}) - { - std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; - std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; - CreateDirectories(StateDir); - CreateDirectories(TempPath); - - ModuleData Data; - Data.Config.ServerStateDir = StateDir; - Data.Config.TempDir = TempPath; - Data.Config.ModuleId = ModuleId; - { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Data.Config.Options = std::move(Root).AsObject(); - } - Data.Files = CreateTestTree(StateDir); - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config); - Hydrator->Dehydrate(); - - Modules.push_back(std::move(Data)); - } - - for (ModuleData& Module : Modules) - { - CleanDirectory(Module.Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config); - Hydrator->Hydrate(); - - // Each module's files must be independently restorable with correct byte content. - // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ. - VerifyTree(Module.Config.ServerStateDir, Module.Files); - } -} - -// --------------------------------------------------------------------------- -// S3Hydrator concurrent test -// --------------------------------------------------------------------------- - TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. @@ -1164,9 +1789,11 @@ TEST_CASE("hydration.s3.concurrent") ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - constexpr int kModuleCount = 16; + constexpr int kModuleCount = 6; constexpr int kThreadCount = 4; + TestThreading Threading(kThreadCount); + ScopedTemporaryDirectory TempDir; struct ModuleData @@ -1176,6 +1803,18 @@ TEST_CASE("hydration.s3.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); @@ -1187,16 +1826,8 @@ TEST_CASE("hydration.s3.concurrent") Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; - { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Modules[I].Config.Options = std::move(Root).AsObject(); - } - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate @@ -1208,9 +1839,8 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -1226,10 +1856,9 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { CleanDirectory(Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -1243,17 +1872,10 @@ TEST_CASE("hydration.s3.concurrent") } } -// --------------------------------------------------------------------------- -// S3Hydrator: no prior state (first-boot path) -// --------------------------------------------------------------------------- - -TEST_CASE("hydration.s3.no_prior_state") +TEST_CASE("hydration.s3.obliterate") { - // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty. - // The "No state found in S3" path goes through the error-cleanup branch, which wipes - // ServerStateDir to ensure no partial or stale content is left for the server to start on. MinioProcessOptions MinioOpts; - MinioOpts.Port = 19014; + MinioOpts.Port = 19019; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -1268,13 +1890,9 @@ TEST_CASE("hydration.s3.no_prior_state") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - // Pre-populate ServerStateDir to confirm the wipe actually runs. - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + const std::string ModuleId = "s3test_obliterate"; - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_no_prior"; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1282,26 +1900,46 @@ TEST_CASE("hydration.s3.no_prior_state") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + + // Dehydrate to populate backend + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + auto ListModuleObjects = [&]() { + S3ClientOptions Opts; + Opts.BucketName = "zen-hydration-test"; + Opts.Endpoint = Minio.Endpoint(); + Opts.PathStyle = true; + Opts.Credentials.AccessKeyId = Minio.RootUser(); + Opts.Credentials.SecretAccessKey = Minio.RootPassword(); + S3Client Client(Opts); + return Client.ListObjects(ModuleId + "/"); + }; + + // Verify objects exist in S3 + CHECK(!ListModuleObjects().Objects.empty()); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + // Re-populate ServerStateDir and TempDir for cleanup verification + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); - // ServerStateDir must be empty: the error path wipes it to prevent a server start - // against stale or partially-installed content. + // Obliterate + Hydration->CreateHydrator(Config)->Obliterate(); + + // Verify S3 objects deleted + CHECK(ListModuleObjects().Objects.empty()); + // Local directories cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); + CHECK(std::filesystem::is_empty(HydrationTemp)); } -// --------------------------------------------------------------------------- -// S3Hydrator: bucket path prefix in TargetSpecification -// --------------------------------------------------------------------------- - -TEST_CASE("hydration.s3.path_prefix") +TEST_CASE("hydration.s3.config_overrides") { - // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under - // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". - // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure(). MinioProcessOptions MinioOpts; MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); @@ -1318,88 +1956,298 @@ TEST_CASE("hydration.s3.path_prefix") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir); + // Path prefix: "s3://bucket/some/prefix" stores objects under + // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". + { + auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_prefix"; + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(ServerStateDir, true); + + Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(ServerStateDir, TestFiles); + } + + // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. + // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. + { + CleanDirectory(ServerStateDir, true); + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(ServerStateDir, true); + + Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(ServerStateDir, TestFiles); + } +} + +TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = 19010; + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "s3test_performance"; + CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); + + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; - CleanDirectory(ServerStateDir, true); + // Dehydrate: upload server state to MinIO + ZEN_INFO("============== DEHYDRATE =============="); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + for (size_t I = 0; I < 1; I++) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + // Wipe server state + CleanDirectory(ServerStateDir, true); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: download from MinIO back to server state + ZEN_INFO("=============== HYDRATE ==============="); + Hydration->CreateHydrator(Config)->Hydrate(); } +} + +//#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" +//#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" + +TEST_CASE("hydration.file.incremental") +{ + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "testmodule"; + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; + + // Hydrate with no prior state + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); + +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: restore from file store + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } -TEST_CASE("hydration.s3.options_region_override") -{ - // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var. - // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. +// --------------------------------------------------------------------------- +// S3Storage test +// --------------------------------------------------------------------------- +TEST_CASE("hydration.s3.incremental") +{ MinioProcessOptions MinioOpts; - MinioOpts.Port = 19016; + MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); - ScopedTemporaryDirectory TempDir; + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - auto TestFiles = CreateTestTree(ServerStateDir); + const std::string ModuleId = "s3test_incremental"; + + TestThreading Threading(8); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_region_override"; + HydrationBase::Configuration BaseConfig; { - std::string ConfigJson = fmt::format( - R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", - Minio.Endpoint()); + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; - CleanDirectory(ServerStateDir, true); + // Hydrate with no prior state + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); + +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: restore from S3 + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); +} + +TEST_CASE("hydration.create_hydrator_rejects_invalid_config") +{ + // Unknown TargetSpecification prefix + CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"})); + + // Unknown Options type { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()})); } - VerifyTree(ServerStateDir, TestFiles); + // Empty Options (no type field) + CHECK_THROWS(InitHydration({})); } TEST_SUITE_END(); |