diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-21 16:26:57 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-21 16:26:57 +0200 |
| commit | 6b59d3d37dcc6320929df2f0074f9a1cb506d1fd (patch) | |
| tree | 2ddd317e381c29a97c666e9d72cf5d614a13f6f8 /src | |
| parent | zen CLI security review fixes (#974) (diff) | |
| download | archived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.tar.xz archived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.zip | |
improved s3 hydration (#997)
- Improvement: Hub shares a single S3 client and IMDS credential provider across all modules, reducing IMDS load and surviving transient IMDS blips during bulk provisioning
- Improvement: Hub validates hydration config at startup; bad `--hub-hydration-target-spec` or `--hub-hydration-target-config` now fails `zen hub` at boot instead of per-module at first hydrate
- Improvement: S3 hydration multipart chunk size configurable via `settings.chunk-size` (default 32 MiB)
- Improvement: S3 client extracts `<Error><Code>` and `<Message>` from XML error bodies (previously logged as `<unhandled content format>`)
- Improvement: S3 client fails fast with a "no credentials available" error when AWS credentials are missing, instead of sending an unsigned request that S3 rejects with a generic 400
- Improvement: IMDS credential provider retries transient connection failures (up to 3 attempts with backoff)
- Improvement: HTTP clients with `RetryCount > 0` also retry on `CURLE_COULDNT_CONNECT`
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/clients/httpclientcurl.cpp | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 41 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 5 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 2137 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 49 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 20 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 8 | ||||
| -rw-r--r-- | src/zenutil/cloud/imdscredentials.cpp | 1 | ||||
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 184 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cloud/s3client.h | 24 |
11 files changed, 1287 insertions, 1184 deletions
diff --git a/src/zenhttp/clients/httpclientcurl.cpp b/src/zenhttp/clients/httpclientcurl.cpp index ddf6cb58f..eee80c269 100644 --- a/src/zenhttp/clients/httpclientcurl.cpp +++ b/src/zenhttp/clients/httpclientcurl.cpp @@ -448,6 +448,7 @@ CurlHttpClient::ShouldRetry(const CurlResult& Result) { case CURLE_OK: break; + case CURLE_COULDNT_CONNECT: case CURLE_RECV_ERROR: case CURLE_SEND_ERROR: case CURLE_OPERATION_TIMEDOUT: diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 3da8a9220..9d5846f71 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -305,6 +305,7 @@ HttpClient::Response::ToText() const case ZenContentType::kJavaScript: case ZenContentType::kJSON: case ZenContentType::kText: + case ZenContentType::kXML: case ZenContentType::kYAML: return std::string{AsText()}; diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 128d3ed35..3fbb7a26e 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -183,20 +183,22 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, "Provision and hydration worker pools must be distinct to avoid deadlocks"); + HydrationBase::Configuration HydrationConfig; if (!m_Config.HydrationTargetSpecification.empty()) { - m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification; } else if (!m_Config.HydrationOptions) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); - m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); + HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); } else { - m_HydrationOptions = m_Config.HydrationOptions; + HydrationConfig.Options = m_Config.HydrationOptions; } + m_Hydration = InitHydration(HydrationConfig); m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); @@ -324,19 +326,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), - .TempDir = m_HydrationTempPath / ModuleId, - .HydrationTargetSpecification = m_HydrationTargetSpecification, - .HydrationOptions = m_HydrationOptions, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath, - .Malloc = m_Config.InstanceMalloc, - .Trace = m_Config.InstanceTrace, - .TraceHost = m_Config.InstanceTraceHost, - .TraceFile = m_Config.InstanceTraceFile, - .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, + *m_Hydration, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -1216,11 +1217,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId) std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; - HydrationConfig Config{.ServerStateDir = ServerStateDir, - .TempDir = TempDir, - .ModuleId = std::string(ModuleId), - .TargetSpecification = m_HydrationTargetSpecification, - .Options = m_HydrationOptions}; + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)}; if (m_Config.OptionalHydrationWorkerPool) { Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool, @@ -1228,7 +1225,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId) .PauseFlag = &PauseFlag}); } - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); Hydrator->Obliterate(); } diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 040f34af5..40d046ce0 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -199,9 +199,8 @@ private: AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; - std::string m_HydrationTargetSpecification; - CbObject m_HydrationOptions; - std::filesystem::path m_HydrationTempPath; + std::unique_ptr<HydrationBase> m_Hydration; + std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 266296fa9..4b2294b49 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -79,15 +79,14 @@ namespace hydration_impl { CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); } + /////////////////////////////////////////////////////////////////////// + // Per-module storage interface driven by IncrementalHydrator. + class StorageBase { public: - virtual ~StorageBase() {} + virtual ~StorageBase() = default; - virtual void Configure(std::string_view ModuleId, - const std::filesystem::path& TempDir, - std::string_view TargetSpecification, - const CbObject& Options) = 0; virtual void SaveMetadata(const CbObject& Data) = 0; virtual CbObject LoadMetadata() = 0; virtual CbObject GetSettings() = 0; @@ -107,141 +106,31 @@ namespace hydration_impl { virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; }; - constexpr std::string_view FileHydratorPrefix = "file://"; - constexpr std::string_view FileHydratorType = "file"; - - constexpr std::string_view S3HydratorPrefix = "s3://"; - constexpr std::string_view S3HydratorType = "s3"; - class FileStorage : public StorageBase { public: - FileStorage() {} - virtual void Configure(std::string_view ModuleId, - const std::filesystem::path& TempDir, - std::string_view TargetSpecification, - const CbObject& Options) - { - ZEN_UNUSED(TempDir); - if (!TargetSpecification.empty()) - { - m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length())); - if (m_StoragePath.empty()) - { - throw zen::runtime_error("Hydration config 'file' type requires a directory path"); - } - } - else - { - CbObjectView Settings = Options["settings"].AsObjectView(); - std::string_view Path = Settings["path"].AsString(); - if (Path.empty()) - { - throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); - } - m_StoragePath = Utf8ToWide(std::string(Path)); - } - m_StoragePath = m_StoragePath / ModuleId; - MakeSafeAbsolutePathInPlace(m_StoragePath); - - m_StatePathName = m_StoragePath / "current-state.cbo"; - m_CASPath = m_StoragePath / "cas"; - CreateDirectories(m_CASPath); - } - virtual void SaveMetadata(const CbObject& Data) - { - BinaryWriter Output; - SaveCompactBinary(Output, Data); - WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); - } - virtual CbObject LoadMetadata() - { - if (!IsFile(m_StatePathName)) - { - return {}; - } - FileContents Content = ReadFile(m_StatePathName); - if (Content.ErrorCode) - { - ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); - } - IoBuffer Payload = Content.Flatten(); - CbValidateError Error; - CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); - if (Error != CbValidateError::None) - { - throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); - } - return Result; - } - - virtual CbObject GetSettings() override { return {}; } - virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); } - - virtual std::vector<IoHash> List() - { - DirectoryContent DirContent; - GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); - std::vector<IoHash> Result; - Result.reserve(DirContent.Files.size()); - for (const std::filesystem::path& Path : DirContent.Files) - { - IoHash Hash; - if (IoHash::TryParse(Path.filename().string(), Hash)) - { - Result.push_back(Hash); - } - } - return Result; - } - - virtual void Put(ParallelWork& Work, - WorkerThreadPool& WorkerPool, - const IoHash& Hash, - uint64_t Size, - const std::filesystem::path& SourcePath) - { - ZEN_UNUSED(Size); - Work.ScheduleWork(WorkerPool, - [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag.load()) - { - CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true}); - } - }); - } + static constexpr std::string_view Prefix = "file://"; + static constexpr std::string_view Type = "file"; - virtual void Get(ParallelWork& Work, - WorkerThreadPool& WorkerPool, - const IoHash& Hash, - uint64_t Size, - const std::filesystem::path& DestinationPath) - { - ZEN_UNUSED(Size); - Work.ScheduleWork( - WorkerPool, - [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag.load()) - { - CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true}); - } - }); - } + explicit FileStorage(std::filesystem::path ModulePath); - virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override - { - // Local filesystem backend does not use modification-time-based retention, so no refresh is needed. - ZEN_UNUSED(Work); - ZEN_UNUSED(WorkerPool); - ZEN_UNUSED(Hash); - } - - virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override - { - ZEN_UNUSED(Work); - ZEN_UNUSED(WorkerPool); - DeleteDirectories(m_StoragePath); - } + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override { return {}; } + virtual void ParseSettings(const CbObjectView&) override {} + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) override; + virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {} + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; private: std::filesystem::path m_StoragePath; @@ -252,876 +141,1058 @@ namespace hydration_impl { class S3Storage : public StorageBase { public: - S3Storage() {} + static constexpr std::string_view Prefix = "s3://"; + static constexpr std::string_view Type = "s3"; + static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; - virtual void Configure(std::string_view ModuleId, - const std::filesystem::path& TempDir, - std::string_view TargetSpecification, - const CbObject& Options) - { - m_Options = Options; + S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); - CbObjectView Settings = m_Options["settings"].AsObjectView(); - std::string_view Spec; - if (!TargetSpecification.empty()) - { - Spec = TargetSpecification; - Spec.remove_prefix(S3HydratorPrefix.size()); - } - else - { - std::string_view Uri = Settings["uri"].AsString(); - if (Uri.empty()) - { - throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); - } - Spec = Uri; - Spec.remove_prefix(S3HydratorPrefix.size()); - } + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override; + virtual void ParseSettings(const CbObjectView& Settings) override; + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) override; + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; - size_t SlashPos = Spec.find('/'); - std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; - m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); - m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId); + private: + S3Client& m_Client; + std::string m_KeyPrefix; + std::filesystem::path m_TempDir; + uint64_t m_MultipartChunkSize; + }; - ZEN_ASSERT(!m_Bucket.empty()); + /////////////////////////////////////////////////////////////////////// + // FileStorage implementations - std::string Region = std::string(Settings["region"].AsString()); - if (Region.empty()) - { - Region = GetEnvVariable("AWS_DEFAULT_REGION"); - } - if (Region.empty()) - { - Region = GetEnvVariable("AWS_REGION"); - } - if (Region.empty()) - { - Region = "us-east-1"; - } - m_Region = std::move(Region); + FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath)) + { + MakeSafeAbsolutePathInPlace(m_StoragePath); + m_StatePathName = m_StoragePath / "current-state.cbo"; + m_CASPath = m_StoragePath / "cas"; + CreateDirectories(m_CASPath); + } - std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); - if (AccessKeyId.empty()) - { - m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); - } - else - { - m_Credentials.AccessKeyId = std::move(AccessKeyId); - m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); - m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); - } - m_TempDir = TempDir; - m_Client = CreateS3Client(); - } + void FileStorage::SaveMetadata(const CbObject& Data) + { + BinaryWriter Output; + SaveCompactBinary(Output, Data); + WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); + } - virtual void SaveMetadata(const CbObject& Data) + CbObject FileStorage::LoadMetadata() + { + if (!IsFile(m_StatePathName)) + { + return {}; + } + FileContents Content = ReadFile(m_StatePathName); + if (Content.ErrorCode) + { + ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); + } + IoBuffer Payload = Content.Flatten(); + CbValidateError Error; + CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); + if (Error != CbValidateError::None) { - S3Client& Client = *m_Client; - BinaryWriter Output; - SaveCompactBinary(Output, Data); - IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + } + return Result; + } - std::string Key = m_KeyPrefix + "/incremental-state.cbo"; - S3Result Result = Client.PutObject(Key, std::move(Payload)); - if (!Result.IsSuccess()) + std::vector<IoHash> FileStorage::List() + { + DirectoryContent DirContent; + GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); + std::vector<IoHash> Result; + Result.reserve(DirContent.Files.size()); + for (const std::filesystem::path& Path : DirContent.Files) + { + IoHash Hash; + if (IoHash::TryParse(Path.filename().string(), Hash)) { - throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + Result.push_back(Hash); } } + return Result; + } - virtual CbObject LoadMetadata() - { - S3Client& Client = *m_Client; - std::string Key = m_KeyPrefix + "/incremental-state.cbo"; - S3GetObjectResult Result = Client.GetObject(Key); - if (!Result.IsSuccess()) - { - if (Result.Error == S3GetObjectResult::NotFoundErrorText) + void FileStorage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) + { + CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true}); + } + }); + } + + void FileStorage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) { - return {}; + CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true}); } - throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); - } + }); + } + + void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) + { + ZEN_UNUSED(Work); + ZEN_UNUSED(WorkerPool); + DeleteDirectories(m_StoragePath); + } - CbValidateError Error; - CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); - if (Error != CbValidateError::None) + /////////////////////////////////////////////////////////////////////// + // S3Storage implementations + + S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize) + : m_Client(Client) + , m_KeyPrefix(std::move(KeyPrefix)) + , m_TempDir(std::move(TempDir)) + , m_MultipartChunkSize(MultipartChunkSize) + { + } + + void S3Storage::SaveMetadata(const CbObject& Data) + { + BinaryWriter Output; + SaveCompactBinary(Output, Data); + IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); + + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3Result Result = m_Client.PutObject(Key, std::move(Payload)); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + } + } + + CbObject S3Storage::LoadMetadata() + { + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3GetObjectResult Result = m_Client.GetObject(Key); + if (!Result.IsSuccess()) + { + if (Result.Error == S3GetObjectResult::NotFoundErrorText) { - throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); + return {}; } - return Meta; + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); } - virtual CbObject GetSettings() override + CbValidateError Error; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); + if (Error != CbValidateError::None) { - CbObjectWriter Writer; - Writer << "MultipartChunkSize" << m_MultipartChunkSize; - return Writer.Save(); + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); } + return Meta; + } - virtual void ParseSettings(const CbObjectView& Settings) + CbObject S3Storage::GetSettings() + { + CbObjectWriter Writer; + Writer << "MultipartChunkSize" << m_MultipartChunkSize; + return Writer.Save(); + } + + void S3Storage::ParseSettings(const CbObjectView& Settings) + { + m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + } + + std::vector<IoHash> S3Storage::List() + { + std::string CasPrefix = m_KeyPrefix + "/cas/"; + S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); + if (!Result.IsSuccess()) { - m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize); + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); } - virtual std::vector<IoHash> List() + std::vector<IoHash> Hashes; + Hashes.reserve(Result.Objects.size()); + for (const S3ObjectInfo& Obj : Result.Objects) { - S3Client& Client = *m_Client; - std::string Prefix = m_KeyPrefix + "/cas/"; - S3ListObjectsResult Result = Client.ListObjects(Prefix); - if (!Result.IsSuccess()) + size_t LastSlash = Obj.Key.rfind('/'); + if (LastSlash == std::string::npos) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error); + continue; } - - std::vector<IoHash> Hashes; - Hashes.reserve(Result.Objects.size()); - for (const S3ObjectInfo& Obj : Result.Objects) + IoHash Hash; + if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) { - size_t LastSlash = Obj.Key.rfind('/'); - if (LastSlash == std::string::npos) - { - continue; - } - IoHash Hash; - if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) - { - Hashes.push_back(Hash); - } + Hashes.push_back(Hash); } - return Hashes; } + return Hashes; + } - virtual void Put(ParallelWork& Work, - WorkerThreadPool& WorkerPool, - const IoHash& Hash, - uint64_t Size, - const std::filesystem::path& SourcePath) - { - Work.ScheduleWork( - WorkerPool, - [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - S3Client& Client = *m_Client; - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + void S3Storage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Client& Client = m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObjectMultipart( + Key, + Size, + [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, + m_MultipartChunkSize); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + else + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObject(Key, File.ReadAll()); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + }); + } - if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) - { - BasicFile File(SourcePath, BasicFile::Mode::kRead); - S3Result Result = Client.PutObjectMultipart( - Key, - Size, - [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, - m_MultipartChunkSize); - if (!Result.IsSuccess()) - { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); - } - } - else - { - BasicFile File(SourcePath, BasicFile::Mode::kRead); - S3Result Result = Client.PutObject(Key, File.ReadAll()); - if (!Result.IsSuccess()) - { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); - } - } - }); - } + void S3Storage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - virtual void Get(ParallelWork& Work, - WorkerThreadPool& WorkerPool, - const IoHash& Hash, - uint64_t Size, - const std::filesystem::path& DestinationPath) + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - - if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + class WorkData { - class WorkData - { - public: - WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) - { - PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); - } - ~WorkData() { m_DestFile.Flush(); } - void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } - - private: - BasicFile m_DestFile; - }; - - std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size); - - uint64_t Offset = 0; - while (Offset < Size) + public: + WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) { - uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); - - Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize); - if (!Chunk.IsSuccess()) - { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", - Key, - Offset, - Offset + ChunkSize - 1, - Chunk.Error); - } - - Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); - }); - Offset += ChunkSize; + PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); } - } - else - { - Work.ScheduleWork( - WorkerPool, - [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir); - if (!Chunk.IsSuccess()) - { - throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); - } + ~WorkData() { m_DestFile.Flush(); } + void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } - if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) - { - std::error_code Ec; - std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); - if (Ec) - { - WriteFile(DestinationPath, Chunk.Content); - } - else - { - Chunk.Content.SetDeleteOnClose(false); - Chunk.Content = {}; - RenameFile(ChunkPath, DestinationPath, Ec); - if (Ec) - { - Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); - Chunk.Content.SetDeleteOnClose(true); - WriteFile(DestinationPath, Chunk.Content); - } - } - } - else - { - WriteFile(DestinationPath, Chunk.Content); - } - }); - } - } + private: + BasicFile m_DestFile; + }; - virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override - { - Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - S3Result Result = m_Client->Touch(Key); - if (!Result.IsSuccess()) - { - throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); - } - }); - } + std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size); - virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override - { - std::string Prefix = m_KeyPrefix + "/"; - S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix); - if (!ListResult.IsSuccess()) + uint64_t Offset = 0; + while (Offset < Size) { - throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error); - } - for (const S3ObjectInfo& Obj : ListResult.Objects) - { - Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) + uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); + + Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { return; } - S3Result DelResult = m_Client->DeleteObject(Key); - if (!DelResult.IsSuccess()) + S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) { - throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); } + + Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); }); + Offset += ChunkSize; } } - - private: - std::unique_ptr<S3Client> CreateS3Client() const + else { - S3ClientOptions Options; - Options.BucketName = m_Bucket; - Options.Region = m_Region; + Work.ScheduleWork(WorkerPool, + [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + } - CbObjectView Settings = m_Options["settings"].AsObjectView(); - std::string_view Endpoint = Settings["endpoint"].AsString(); - if (!Endpoint.empty()) - { - Options.Endpoint = std::string(Endpoint); - Options.PathStyle = Settings["path-style"].AsBool(); - } + if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + { + std::error_code Ec; + std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); + if (Ec) + { + WriteFile(DestinationPath, Chunk.Content); + } + else + { + Chunk.Content.SetDeleteOnClose(false); + Chunk.Content = {}; + RenameFile(ChunkPath, DestinationPath, Ec); + if (Ec) + { + Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); + Chunk.Content.SetDeleteOnClose(true); + WriteFile(DestinationPath, Chunk.Content); + } + } + } + else + { + WriteFile(DestinationPath, Chunk.Content); + } + }); + } + } - if (m_CredentialProvider) + void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) + { + Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) { - Options.CredentialProvider = m_CredentialProvider; + return; } - else + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + S3Result Result = m_Client.Touch(Key); + if (!Result.IsSuccess()) { - Options.Credentials = m_Credentials; + throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); } + }); + } - Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; - - return std::make_unique<S3Client>(Options); + void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) + { + std::string ModulePrefix = m_KeyPrefix + "/"; + S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); + if (!ListResult.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); } + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Result DelResult = m_Client.DeleteObject(Key); + if (!DelResult.IsSuccess()) + { + throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + } + }); + } + } - static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; - - std::string m_KeyPrefix; - CbObject m_Options; - std::string m_Bucket; - std::string m_Region; - SigV4Credentials m_Credentials; - Ref<ImdsCredentialProvider> m_CredentialProvider; - std::unique_ptr<S3Client> m_Client; - std::filesystem::path m_TempDir; - uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize; - }; - -} // namespace hydration_impl + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator: the only HydrationStrategyBase implementation. + // Holds a per-module StorageBase and threading context; drives the + // hydrate/dehydrate algorithm. -using namespace hydration_impl; + class IncrementalHydrator : public HydrationStrategyBase + { + public: + IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage); + virtual ~IncrementalHydrator() override; -/////////////////////////////////////////////////////////////////////////// + virtual void Dehydrate(const CbObject& CachedState) override; + virtual CbObject Hydrate() override; + virtual void Obliterate() override; -class IncrementalHydrator : public HydrationStrategyBase -{ -public: - IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage); - virtual ~IncrementalHydrator() override; - virtual void Configure(const HydrationConfig& Config) override; - virtual void Dehydrate(const CbObject& CachedState) override; - virtual CbObject Hydrate() override; - virtual void Obliterate() override; + private: + struct Entry + { + std::filesystem::path RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + }; -private: - struct Entry - { - std::filesystem::path RelativePath; - uint64_t Size; - uint64_t ModTick; - IoHash Hash; + std::unique_ptr<StorageBase> m_Storage; + HydrationConfig m_Config; + WorkerThreadPool m_FallbackWorkPool; + std::atomic<bool> m_FallbackAbortFlag{false}; + std::atomic<bool> m_FallbackPauseFlag{false}; + HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, + .AbortFlag = &m_FallbackAbortFlag, + .PauseFlag = &m_FallbackPauseFlag}; }; - std::unique_ptr<StorageBase> m_Storage; - HydrationConfig m_Config; - WorkerThreadPool m_FallbackWorkPool; - std::atomic<bool> m_FallbackAbortFlag{false}; - std::atomic<bool> m_FallbackPauseFlag{false}; - HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, - .AbortFlag = &m_FallbackAbortFlag, - .PauseFlag = &m_FallbackPauseFlag}; -}; + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator implementations -IncrementalHydrator::IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0) -{ -} - -IncrementalHydrator::~IncrementalHydrator() -{ - m_Storage.reset(); -} - -void -IncrementalHydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; - m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options); - if (Config.Threading) + IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage) + : m_Storage(std::move(Storage)) + , m_Config(Config) + , m_FallbackWorkPool(0) { - m_Threading = *Config.Threading; + if (Config.Threading) + { + m_Threading = *Config.Threading; + } } -} -void -IncrementalHydrator::Dehydrate(const CbObject& CachedState) -{ - Stopwatch Timer; + IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } - const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); - try + void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { - std::unordered_map<std::string, size_t> StateEntryLookup; - std::vector<Entry> StateEntries; - for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) - { - CbObjectView EntryView = FieldView.AsObjectView(); - std::filesystem::path RelativePath(EntryView["Path"].AsString()); - uint64_t Size = EntryView["Size"].AsUInt64(); - uint64_t ModTick = EntryView["ModTick"].AsUInt64(); - IoHash Hash = EntryView["Hash"].AsHash(); - - StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); - StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); - } + Stopwatch Timer; - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - ServerStateDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | - DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, - DirContent); + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + try + { + std::unordered_map<std::string, size_t> StateEntryLookup; + std::vector<Entry> StateEntries; + for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) + { + CbObjectView EntryView = FieldView.AsObjectView(); + std::filesystem::path RelativePath(EntryView["Path"].AsString()); + uint64_t Size = EntryView["Size"].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"].AsUInt64(); + IoHash Hash = EntryView["Hash"].AsHash(); + + StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); + StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); + } - ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", - m_Config.ModuleId, - m_Config.ServerStateDir, - DirContent.Files.size(), - NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); - std::vector<Entry> Entries; - Entries.resize(DirContent.Files.size()); + ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + DirContent.Files.size(), + NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; - uint64_t HashedFiles = 0; - uint64_t HashedBytes = 0; + std::vector<Entry> Entries; + Entries.resize(DirContent.Files.size()); - std::unordered_set<IoHash> ExistsLookup; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + uint64_t HashedFiles = 0; + uint64_t HashedBytes = 0; - { - ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unordered_set<IoHash> ExistsLookup; - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); - if (AbsPath.filename() == "reserve.gc") - { - continue; - } - const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - if (*RelativePath.begin() == ".sentry-native") - { - continue; - } - if (RelativePath == ".lock") - { - continue; - } + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - Entry& CurrentEntry = Entries[TotalFiles]; - CurrentEntry.RelativePath = RelativePath; - CurrentEntry.Size = DirContent.FileSizes[FileIndex]; - CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - - bool FoundHash = false; - if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - const Entry& StateEntry = StateEntries[KnownIt->second]; - if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + if (AbsPath.filename() == "reserve.gc") { - CurrentEntry.Hash = StateEntry.Hash; - FoundHash = true; + continue; + } + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + if (*RelativePath.begin() == ".sentry-native") + { + continue; + } + if (RelativePath == ".lock") + { + continue; } - } - - if (!FoundHash) - { - Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - Entry& CurrentEntry = Entries[EntryIndex]; + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = RelativePath; + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - bool FoundHash = false; - if (AbsPath.extension().empty()) + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) { - auto It = CurrentEntry.RelativePath.begin(); - if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), - RawHash, - RawSize); - if (Compressed) - { - // We compose a meta-hash since taking the RawHash might collide with an existing - // non-compressed file with the same content The collision is unlikely except if the - // compressed data is zero bytes causing RawHash to be the same as an empty file. - IoHashStream Hasher; - Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); - Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); - CurrentEntry.Hash = Hasher.GetHash(); - FoundHash = true; - } - } + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; } + } - if (!FoundHash) - { - CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); - } - }); - HashedFiles++; - HashedBytes += CurrentEntry.Size; + if (!FoundHash) + { + Work.ScheduleWork(*m_Threading.WorkerPool, + [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + + Entry& CurrentEntry = Entries[EntryIndex]; + + bool FoundHash = false; + if (AbsPath.extension().empty()) + { + auto It = CurrentEntry.RelativePath.begin(); + if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed( + SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), + RawHash, + RawSize); + if (Compressed) + { + // We compose a meta-hash since taking the RawHash might collide with an + // existing non-compressed file with the same content The collision is + // unlikely except if the compressed data is zero bytes causing RawHash + // to be the same as an empty file. + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); + CurrentEntry.Hash = Hasher.GetHash(); + FoundHash = true; + } + } + } + + if (!FoundHash) + { + CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); + } + }); + HashedFiles++; + HashedBytes += CurrentEntry.Size; + } + TotalFiles++; + TotalBytes += CurrentEntry.Size; } - TotalFiles++; - TotalBytes += CurrentEntry.Size; - } - std::vector<IoHash> ExistingEntries = m_Storage->List(); - ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); + std::vector<IoHash> ExistingEntries = m_Storage->List(); + ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); - Work.Wait(); - - Entries.resize(TotalFiles); - } + Work.Wait(); - uint64_t UploadedFiles = 0; - uint64_t UploadedBytes = 0; - uint64_t TouchedFiles = 0; - uint64_t TouchedBytes = 0; - { - ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + Entries.resize(TotalFiles); + } - for (const Entry& CurrentEntry : Entries) + uint64_t UploadedFiles = 0; + uint64_t UploadedBytes = 0; + uint64_t TouchedFiles = 0; + uint64_t TouchedBytes = 0; { - if (!ExistsLookup.contains(CurrentEntry.Hash)) - { - m_Storage->Put(Work, - *m_Threading.WorkerPool, - CurrentEntry.Hash, - CurrentEntry.Size, - MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); - UploadedFiles++; - UploadedBytes += CurrentEntry.Size; - } - else + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + + for (const Entry& CurrentEntry : Entries) { - // Refresh the backend's modification time so lifecycle-expiration policies - // do not evict CAS entries that are still referenced by this module. - m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash); - TouchedFiles++; - TouchedBytes += CurrentEntry.Size; + if (!ExistsLookup.contains(CurrentEntry.Hash)) + { + m_Storage->Put(Work, + *m_Threading.WorkerPool, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); + UploadedFiles++; + UploadedBytes += CurrentEntry.Size; + } + else + { + // Refresh the backend's modification time so lifecycle-expiration policies + // do not evict CAS entries that are still referenced by this module. + m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash); + TouchedFiles++; + TouchedBytes += CurrentEntry.Size; + } } - } - Work.Wait(); - uint64_t UploadTimeMs = Timer.GetElapsedTimeMs(); - - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "SourceFolder" << ServerStateDir.generic_string(); - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadTimeMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "StorageSettings" << m_Storage->GetSettings(); - - Meta.BeginArray("Files"); - for (const Entry& CurrentEntry : Entries) - { - Meta.BeginObject(); + Work.Wait(); + uint64_t UploadTimeMs = Timer.GetElapsedTimeMs(); + + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SourceFolder" << ServerStateDir.generic_string(); + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadTimeMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "StorageSettings" << m_Storage->GetSettings(); + + Meta.BeginArray("Files"); + for (const Entry& CurrentEntry : Entries) { - Meta << "Path" << CurrentEntry.RelativePath.generic_string(); - Meta << "Size" << CurrentEntry.Size; - Meta << "ModTick" << CurrentEntry.ModTick; - Meta << "Hash" << CurrentEntry.Hash; + Meta.BeginObject(); + { + Meta << "Path" << CurrentEntry.RelativePath.generic_string(); + Meta << "Size" << CurrentEntry.Size; + Meta << "ModTick" << CurrentEntry.ModTick; + Meta << "Hash" << CurrentEntry.Hash; + } + Meta.EndObject(); } - Meta.EndObject(); - } - Meta.EndArray(); - - m_Storage->SaveMetadata(Meta.Save()); - } + Meta.EndArray(); - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - - ZEN_INFO( - "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) in {}", - m_Config.ModuleId, - m_Config.ServerStateDir, - HashedFiles, - NiceBytes(HashedBytes), - UploadedFiles, - NiceBytes(UploadedBytes), - TouchedFiles, - NiceBytes(TouchedBytes), - TotalFiles, - NiceBytes(TotalBytes), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); - } -} + m_Storage->SaveMetadata(Meta.Save()); + } -CbObject -IncrementalHydrator::Hydrate() -{ - Stopwatch Timer; + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); - const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); - try - { - CbObject Meta = m_Storage->LoadMetadata(); - if (!Meta) + ZEN_INFO( + "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) " + "in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + HashedFiles, + NiceBytes(HashedBytes), + UploadedFiles, + NiceBytes(UploadedBytes), + TouchedFiles, + NiceBytes(TouchedBytes), + TotalFiles, + NiceBytes(TotalBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + catch (const std::exception& Ex) { - ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - return CbObject(); + ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); } + } - std::unordered_map<std::string, size_t> EntryLookup; - std::vector<Entry> Entries; - uint64_t TotalSize = 0; + CbObject IncrementalHydrator::Hydrate() + { + Stopwatch Timer; - for (CbFieldView FieldView : Meta["Files"]) + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + try { - CbObjectView EntryView = FieldView.AsObjectView(); - if (EntryView) + CbObject Meta = m_Storage->LoadMetadata(); + if (!Meta) { - Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), - .Size = EntryView["Size"].AsUInt64(), - .ModTick = EntryView["ModTick"].AsUInt64(), - .Hash = EntryView["Hash"].AsHash()}; - TotalSize += NewEntry.Size; - EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); - Entries.emplace_back(std::move(NewEntry)); + ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", + m_Config.ModuleId, + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + return CbObject(); } - } - ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", - m_Config.ModuleId, - m_Config.ServerStateDir, - Entries.size(), - NiceBytes(TotalSize)); + std::unordered_map<std::string, size_t> EntryLookup; + std::vector<Entry> Entries; + uint64_t TotalSize = 0; + + for (CbFieldView FieldView : Meta["Files"]) + { + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) + { + Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), + .Size = EntryView["Size"].AsUInt64(), + .ModTick = EntryView["ModTick"].AsUInt64(), + .Hash = EntryView["Hash"].AsHash()}; + TotalSize += NewEntry.Size; + EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); + Entries.emplace_back(std::move(NewEntry)); + } + } - m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + Entries.size(), + NiceBytes(TotalSize)); - uint64_t DownloadedBytes = 0; - uint64_t DownloadedFiles = 0; + m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); - { - ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + uint64_t DownloadedBytes = 0; + uint64_t DownloadedFiles = 0; - for (const Entry& CurrentEntry : Entries) { - std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); - CreateDirectories(Path.parent_path()); - m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path); - DownloadedBytes += CurrentEntry.Size; - DownloadedFiles++; + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (const Entry& CurrentEntry : Entries) + { + std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); + CreateDirectories(Path.parent_path()); + m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path); + DownloadedBytes += CurrentEntry.Size; + DownloadedFiles++; + } + + Work.Wait(); } - Work.Wait(); - } + // Downloaded successfully - swap into ServerStateDir + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItTmp != TempDir.begin()) + { + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + TempDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); - // If the two paths share at least one common component they are on the same drive/volume - // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); - if (ItTmp != TempDir.begin()) - { + for (const std::filesystem::path& AbsPath : DirContent.Directories) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + } + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + } + } + + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + } + else + { + // Slow path: TempDir and ServerStateDir are on different filesystems, so rename + // would fail. Copy the tree instead and clean up the temp files afterwards. + ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + } + + // TODO: This could perhaps be done more efficently, but ok for now DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, - TempDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); - for (const std::filesystem::path& AbsPath : DirContent.Directories) + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); - if (Ec) + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + + if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) { - throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + HydrateState.BeginObject(); + { + HydrateState << "Path" << RelativePath.generic_string(); + HydrateState << "Size" << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash" << Entries[It->second].Hash; + } + HydrateState.EndObject(); } - } - for (const std::filesystem::path& AbsPath : DirContent.Files) - { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); - if (Ec) + else { - throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + ZEN_ASSERT(false); } } + HydrateState.EndArray(); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + CbObject StateObject = HydrateState.Save(); + + ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + DownloadedFiles, + NiceBytes(DownloadedBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return StateObject; } - else + catch (const std::exception& Ex) { - // Slow path: TempDir and ServerStateDir are on different filesystems, so rename - // would fail. Copy the tree instead and clean up the temp files afterwards. - ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); + ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + return {}; } + } - // TODO: This could perhaps be done more efficently, but ok for now - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - ServerStateDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | - DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, - DirContent); - - CbObjectWriter HydrateState; - HydrateState.BeginArray("Files"); - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) - { - std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + void IncrementalHydrator::Obliterate() + { + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); - if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) - { - HydrateState.BeginObject(); - { - HydrateState << "Path" << RelativePath.generic_string(); - HydrateState << "Size" << DirContent.FileSizes[FileIndex]; - HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; - HydrateState << "Hash" << Entries[It->second].Hash; - } - HydrateState.EndObject(); - } - else - { - ZEN_ASSERT(false); - } + try + { + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + m_Storage->Delete(Work, *m_Threading.WorkerPool); + Work.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); } - HydrateState.EndArray(); - - CbObject StateObject = HydrateState.Save(); - - ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}", - m_Config.ModuleId, - m_Config.ServerStateDir, - DownloadedFiles, - NiceBytes(DownloadedBytes), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - return StateObject; - } - catch (const std::exception& Ex) - { - ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); - return {}; } -} -void -IncrementalHydrator::Obliterate() +} // namespace hydration_impl + +/////////////////////////////////////////////////////////////////////////// +// HydrationBase subclasses - own hub-wide backend state, hand per-module +// storages the exact inputs they need in CreateHydrator. + +class FileHydration : public HydrationBase { - const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); - const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); +public: + explicit FileHydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::filesystem::path m_StorageRoot; +}; - try +class S3Hydration : public HydrationBase +{ +public: + explicit S3Hydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::string m_Bucket; + std::string m_Region; + std::string m_Endpoint; + bool m_PathStyle = false; + std::string m_KeyPrefixRoot; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; + std::unique_ptr<S3Client> m_Client; + uint64_t m_DefaultMultipartChunkSize; +}; + +/////////////////////////////////////////////////////////////////////////// +// Implementations + +FileHydration::FileHydration(const Configuration& Config) +{ + if (!Config.TargetSpecification.empty()) { - ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - m_Storage->Delete(Work, *m_Threading.WorkerPool); - Work.Wait(); + m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); + if (m_StorageRoot.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires a directory path"); + } } - catch (const std::exception& Ex) + else { - ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Path = Settings["path"].AsString(); + if (Path.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + } + m_StorageRoot = Utf8ToWide(std::string(Path)); } - - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + MakeSafeAbsolutePathInPlace(m_StorageRoot); } std::unique_ptr<HydrationStrategyBase> -CreateHydrator(const HydrationConfig& Config) +FileHydration::CreateHydrator(const HydrationConfig& Config) { - std::unique_ptr<StorageBase> Storage; + using namespace hydration_impl; + return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId)); +} +S3Hydration::S3Hydration(const Configuration& Config) +{ + using namespace hydration_impl; + + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Spec; if (!Config.TargetSpecification.empty()) { - if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) - { - Storage = std::make_unique<FileStorage>(); - } - else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) - { - Storage = std::make_unique<S3Storage>(); - } - else - { - throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); - } + Spec = Config.TargetSpecification; + Spec.remove_prefix(S3Storage::Prefix.size()); } else { - std::string_view Type = Config.Options["type"].AsString(); - if (Type == FileHydratorType) + std::string_view Uri = Settings["uri"].AsString(); + if (Uri.empty()) { - Storage = std::make_unique<FileStorage>(); + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); } - else if (Type == S3HydratorType) - { - Storage = std::make_unique<S3Storage>(); - } - else if (!Type.empty()) + Spec = Uri; + Spec.remove_prefix(S3Storage::Prefix.size()); + } + + size_t SlashPos = Spec.find('/'); + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; + + if (m_Bucket.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); + } + + std::string Region = std::string(Settings["region"].AsString()); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_DEFAULT_REGION"); + } + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; + } + m_Region = std::move(Region); + + std::string_view Endpoint = Settings["endpoint"].AsString(); + if (!Endpoint.empty()) + { + m_Endpoint = std::string(Endpoint); + m_PathStyle = Settings["path-style"].AsBool(); + } + + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + } + + m_DefaultMultipartChunkSize = Settings["chunk-size"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + + S3ClientOptions ClientOptions; + ClientOptions.BucketName = m_Bucket; + ClientOptions.Region = m_Region; + ClientOptions.Endpoint = m_Endpoint; + ClientOptions.PathStyle = m_PathStyle; + if (m_CredentialProvider) + { + ClientOptions.CredentialProvider = m_CredentialProvider; + } + else + { + ClientOptions.Credentials = m_Credentials; + } + ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + + m_Client = std::make_unique<S3Client>(ClientOptions); +} + +std::unique_ptr<HydrationStrategyBase> +S3Hydration::CreateHydrator(const HydrationConfig& Config) +{ + using namespace hydration_impl; + std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); + return std::make_unique<IncrementalHydrator>( + Config, + std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); +} + +std::unique_ptr<HydrationBase> +InitHydration(const HydrationBase::Configuration& Config) +{ + using namespace hydration_impl; + + if (!Config.TargetSpecification.empty()) + { + if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0) { - throw zen::runtime_error("Unknown hydration target type '{}'", Type); + return std::make_unique<FileHydration>(Config); } - else + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0) { - throw zen::runtime_error("No hydration target configured"); + return std::make_unique<S3Hydration>(Config); } + throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); } - auto Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); - Hydrator->Configure(Config); - return Hydrator; + std::string_view Type = Config.Options["type"].AsString(); + if (Type == FileStorage::Type) + { + return std::make_unique<FileHydration>(Config); + } + if (Type == S3Storage::Type) + { + return std::make_unique<S3Hydration>(Config); + } + if (!Type.empty()) + { + throw zen::runtime_error("Unknown hydration target type '{}'", Type); + } + throw zen::runtime_error("No hydration target configured"); } #if ZEN_WITH_TESTS @@ -1226,27 +1297,19 @@ TEST_CASE("hydration.file.dehydrate_hydrate") const std::string ModuleId = "testmodule"; auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate: copy server state to file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); @@ -1265,26 +1328,17 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state") auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - // Dehydrate the original state - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Put a stale file in ServerStateDir to simulate leftover state WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); // Hydrate - must wipe stale file and restore original - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); VerifyTree(ServerStateDir, TestFiles); @@ -1309,23 +1363,15 @@ TEST_CASE("hydration.file.excluded_files_not_dehydrated") WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule_excl"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Hydrate into a clean directory CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); @@ -1352,17 +1398,12 @@ TEST_CASE("hydration.file.obliterate") const std::string ModuleId = "obliterate_test"; CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate so the backend store has data - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::exists(HydrationStore / ModuleId)); // Put some files back in ServerStateDir and TempDir to verify cleanup @@ -1370,10 +1411,7 @@ TEST_CASE("hydration.file.obliterate") WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Obliterate(); - } + Hydration->CreateHydrator(Config)->Obliterate(); // Backend store directory deleted CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); @@ -1406,6 +1444,8 @@ TEST_CASE("hydration.file.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); @@ -1414,12 +1454,11 @@ TEST_CASE("hydration.file.concurrent") CreateDirectories(StateDir); CreateDirectories(TempPath); - Modules[I].Config.ServerStateDir = StateDir; - Modules[I].Config.TempDir = TempPath; - Modules[I].Config.ModuleId = ModuleId; - Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); - Modules[I].Config.Threading = Threading.Options; - Modules[I].Files = CreateSmallTestTree(StateDir); + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate @@ -1431,9 +1470,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -1449,9 +1487,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -1490,10 +1527,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_roundtrip"; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1501,39 +1535,30 @@ TEST_CASE("hydration.s3.dehydrate_hydrate") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"}; // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir // with a stale file to confirm the cleanup branch wipes it. WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); CHECK(std::filesystem::is_empty(ServerStateDir)); // v1: dehydrate without a marker file CreateSmallTestTree(ServerStateDir); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // v2: dehydrate WITH a marker file that only v2 has CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); @@ -1568,6 +1593,18 @@ TEST_CASE("hydration.s3.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); @@ -1580,16 +1617,7 @@ TEST_CASE("hydration.s3.concurrent") Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; - { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Modules[I].Config.Options = std::move(Root).AsObject(); - } - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate @@ -1601,9 +1629,8 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -1619,10 +1646,9 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { CleanDirectory(Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -1656,10 +1682,7 @@ TEST_CASE("hydration.s3.obliterate") const std::string ModuleId = "s3test_obliterate"; - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1667,15 +1690,15 @@ TEST_CASE("hydration.s3.obliterate") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); auto ListModuleObjects = [&]() { S3ClientOptions Opts; @@ -1696,10 +1719,7 @@ TEST_CASE("hydration.s3.obliterate") WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Obliterate(); - } + Hydration->CreateHydrator(Config)->Obliterate(); // Verify S3 objects deleted CHECK(ListModuleObjects().Objects.empty()); @@ -1731,10 +1751,7 @@ TEST_CASE("hydration.s3.config_overrides") { auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_prefix"; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", @@ -1742,20 +1759,17 @@ TEST_CASE("hydration.s3.config_overrides") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } @@ -1768,10 +1782,7 @@ TEST_CASE("hydration.s3.config_overrides") ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_region_override"; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", @@ -1779,20 +1790,17 @@ TEST_CASE("hydration.s3.config_overrides") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); - } + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } @@ -1822,25 +1830,26 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) TestThreading Threading(4); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.Threading = Threading.Options; - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); - - // Dehydrate: upload server state to MinIO + HydrationBase::Configuration BaseConfig; { - ZEN_INFO("============== DEHYDRATE =============="); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(CbObject()); + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; + + // Dehydrate: upload server state to MinIO + ZEN_INFO("============== DEHYDRATE =============="); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); for (size_t I = 0; I < 1; I++) { @@ -1849,11 +1858,8 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: download from MinIO back to server state - { - ZEN_INFO("=============== HYDRATE ==============="); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + ZEN_INFO("=============== HYDRATE ==============="); + Hydration->CreateHydrator(Config)->Hydrate(); } } @@ -1880,23 +1886,16 @@ TEST_CASE("hydration.file.incremental") TestThreading Threading(4); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "file://" + HydrationStore.string(); - Config.Threading = Threading.Options; + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - std::unique_ptr<StorageBase> Storage = std::make_unique<FileStorage>(); - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; // Hydrate with no prior state - CbObject HydrationState; - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - CHECK_FALSE(HydrationState); - } + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); @@ -1906,54 +1905,33 @@ TEST_CASE("hydration.file.incremental") // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); - // Hydrate: restore from S3 - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + // Hydrate: restore from file store + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } // --------------------------------------------------------------------------- @@ -1986,11 +1964,7 @@ TEST_CASE("hydration.s3.incremental") TestThreading Threading(8); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.Threading = Threading.Options; + HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1998,19 +1972,18 @@ TEST_CASE("hydration.s3.incremental") std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - std::unique_ptr<StorageBase> Storage = std::make_unique<S3Storage>(); - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; // Hydrate with no prior state - CbObject HydrationState; - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - CHECK_FALSE(HydrationState); - } + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); @@ -2020,83 +1993,51 @@ TEST_CASE("hydration.s3.incremental") // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from S3 - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state - { - Hydrator->Configure(Config); - HydrationState = Hydrator->Hydrate(); - } + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload - { - Hydrator->Configure(Config); - Hydrator->Dehydrate(HydrationState); - } + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } TEST_CASE("hydration.create_hydrator_rejects_invalid_config") { - ScopedTemporaryDirectory TempDir; - - HydrationConfig Config; - Config.ServerStateDir = TempDir.Path() / "state"; - Config.TempDir = TempDir.Path() / "temp"; - Config.ModuleId = "invalid_test"; - // Unknown TargetSpecification prefix - Config.TargetSpecification = "ftp://somewhere"; - CHECK_THROWS(CreateHydrator(Config)); + CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"})); // Unknown Options type - Config.TargetSpecification.clear(); { std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()})); } - CHECK_THROWS(CreateHydrator(Config)); // Empty Options (no type field) - Config.Options = CbObject(); - CHECK_THROWS(CreateHydrator(Config)); + CHECK_THROWS(InitHydration({})); } TEST_SUITE_END(); diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index fc2f309b2..0455dda91 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -4,8 +4,11 @@ #include <zencore/compactbinary.h> +#include <atomic> #include <filesystem> +#include <memory> #include <optional> +#include <string> namespace zen { @@ -19,16 +22,12 @@ struct HydrationConfig std::filesystem::path TempDir; // Module ID of the server state being hydrated/dehydrated std::string ModuleId; - // Back-end specific target specification (e.g. S3 bucket, file path, etc) - std::string TargetSpecification; - // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification) - CbObject Options; struct ThreadingOptions { - WorkerThreadPool* WorkerPool; - std::atomic<bool>* AbortFlag; - std::atomic<bool>* PauseFlag; + WorkerThreadPool* WorkerPool = nullptr; + std::atomic<bool>* AbortFlag = nullptr; + std::atomic<bool>* PauseFlag = nullptr; }; // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread. @@ -41,15 +40,11 @@ struct HydrationConfig * An instance of this interface is used to perform hydration OR * dehydration of server state. It's expected to be used only once * and not reused. - * */ struct HydrationStrategyBase { virtual ~HydrationStrategyBase() = default; - // Set up the hydration target from Config. Must be called before Hydrate/Dehydrate. - virtual void Configure(const HydrationConfig& Config) = 0; - // Upload server state to the configured target. ServerStateDir is wiped on success. // On failure, ServerStateDir is left intact. virtual void Dehydrate(const CbObject& CachedState) = 0; @@ -62,8 +57,36 @@ struct HydrationStrategyBase virtual void Obliterate() = 0; }; -// Create a configured hydrator based on Config. Ready to call Hydrate/Dehydrate immediately. -std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config); +/** + * @brief Hub-wide hydration backend + * + * Constructed once per hub via InitHydration. Holds the shared connection / client / + * credentials state for the configured backend (e.g. a single S3 client and IMDS + * credential provider shared by all modules). CreateHydrator produces a ready-to-use + * per-module HydrationStrategyBase that references the shared state - no per-module + * backend setup cost. + */ +class HydrationBase +{ +public: + struct Configuration + { + // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path") + std::string TargetSpecification; + // Full config object (mutually exclusive with TargetSpecification) + CbObject Options; + }; + + virtual ~HydrationBase() = default; + + // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate. + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0; +}; + +// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration). +// Throws zen::runtime_error if the config cannot be resolved to a known backend or if +// backend-specific validation fails. +std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config); #if ZEN_WITH_TESTS void hydration_forcelink(); diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index af2c19113..97edc5223 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -11,8 +11,12 @@ namespace zen { -StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId) -: m_Config(Config) +StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, + HydrationBase& Hydration, + const Configuration& Config, + std::string_view ModuleId) +: m_Hydration(Hydration) +, m_Config(Config) , m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) { @@ -156,7 +160,7 @@ StorageServerInstance::ObliterateLocked() std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); Hydrator->Obliterate(); } @@ -204,7 +208,7 @@ StorageServerInstance::Hydrate() std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); m_HydrationState = Hydrator->Hydrate(); } @@ -214,18 +218,14 @@ StorageServerInstance::Dehydrate() std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); Hydrator->Dehydrate(m_HydrationState); } HydrationConfig StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) { - HydrationConfig Config{.ServerStateDir = m_Config.StateDir, - .TempDir = m_Config.TempDir, - .ModuleId = m_ModuleId, - .TargetSpecification = m_Config.HydrationTargetSpecification, - .Options = m_Config.HydrationOptions}; + HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId}; if (m_Config.OptionalWorkerPool) { Config.Threading.emplace( diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 80f8a5016..c5917afc9 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -29,8 +29,6 @@ public: uint16_t BasePort; std::filesystem::path StateDir; std::filesystem::path TempDir; - std::string HydrationTargetSpecification; - CbObject HydrationOptions; uint32_t HttpThreadCount = 0; // Automatic int CoreLimit = 0; // Automatic std::filesystem::path ConfigPath; @@ -42,7 +40,10 @@ public: WorkerThreadPool* OptionalWorkerPool = nullptr; }; - StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId); + StorageServerInstance(ZenServerEnvironment& RunEnvironment, + HydrationBase& Hydration, + const Configuration& Config, + std::string_view ModuleId); ~StorageServerInstance(); inline std::string_view GetModuleId() const { return m_ModuleId; } @@ -140,6 +141,7 @@ private: void WakeLocked(); mutable RwLock m_Lock; + HydrationBase& m_Hydration; const Configuration m_Config; std::string m_ModuleId; ZenServerInstance m_ServerInstance; diff --git a/src/zenutil/cloud/imdscredentials.cpp b/src/zenutil/cloud/imdscredentials.cpp index 433afdc3c..a23cb9c28 100644 --- a/src/zenutil/cloud/imdscredentials.cpp +++ b/src/zenutil/cloud/imdscredentials.cpp @@ -64,6 +64,7 @@ ImdsCredentialProvider::ImdsCredentialProvider(const ImdsCredentialProviderOptio .LogCategory = "imds", .ConnectTimeout = Options.ConnectTimeout, .Timeout = Options.RequestTimeout, + .RetryCount = 3, }) { ZEN_INFO("IMDS credential provider configured (endpoint: {})", m_HttpClient.GetBaseUri()); diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index 3d6dca562..f8bed92da 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -135,6 +135,41 @@ namespace { return nullptr; } + /// Extract Code/Message from an S3 XML error body. Returns true if an <Error> element was + /// found, even if Code/Message are empty. + bool ExtractS3Error(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage) + { + if (Body.find("<Error>") == std::string_view::npos) + { + return false; + } + OutCode = ExtractXmlValue(Body, "Code"); + OutMessage = ExtractXmlValue(Body, "Message"); + return true; + } + + /// Build a human-readable error message for a failed S3 response. When the response body + /// contains an S3 `<Error>` element, the Code and Message fields are included in the string + /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.) + /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport + /// message when no XML body is available (HEAD responses, transport errors). + std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response) + { + if (!Response.Error.has_value() && Response.ResponsePayload) + { + std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize()); + std::string_view Code; + std::string_view Message; + if (ExtractS3Error(Body, Code, Message) && (!Code.empty() || !Message.empty())) + { + ExtendableStringBuilder<256> Decoded; + DecodeXmlEntities(Message, Decoded); + return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView()); + } + } + return Response.ErrorMessage(Prefix); + } + } // namespace std::string_view S3GetObjectResult::NotFoundErrorText = "Not found"; @@ -187,6 +222,14 @@ S3Client::GetCurrentCredentials() } std::string +S3Client::BuildNoCredentialsError(std::string Context) +{ + std::string Err = fmt::format("{}: no credentials available", Context); + ZEN_WARN("{}", Err); + return Err; +} + +std::string S3Client::BuildEndpoint() const { if (!m_Endpoint.empty()) @@ -286,14 +329,13 @@ S3Client::GetSigningKey(std::string_view DateStamp) } HttpClient::KeyValueMap -S3Client::SignRequest(std::string_view Method, +S3Client::SignRequest(const SigV4Credentials& Credentials, + std::string_view Method, std::string_view Path, std::string_view CanonicalQueryString, std::string_view PayloadHash, std::span<const std::pair<std::string, std::string>> ExtraSignedHeaders) { - SigV4Credentials Credentials = GetCurrentCredentials(); - std::string AmzDate = GetAmzTimestamp(); // Build sorted headers to sign (must be sorted by lowercase name) @@ -337,17 +379,23 @@ S3Client::SignRequest(std::string_view Method, S3Result S3Client::PutObject(std::string_view Key, IoBuffer Content) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 PUT '{}' failed", Key); !Err.empty()) + { + return S3Result{std::move(Err)}; + } + std::string Path = KeyToPath(Key); // Hash the payload std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize())); - HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, "", PayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", PayloadHash); HttpClient::Response Response = m_HttpClient.Put(Path, Content, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 PUT failed"); + std::string Err = S3ErrorMessage("S3 PUT failed", Response); ZEN_WARN("S3 PUT '{}' failed: {}", Key, Err); return S3Result{std::move(Err)}; } @@ -362,9 +410,15 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content) S3GetObjectResult S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 GET '{}' failed", Key); !Err.empty()) + { + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + std::string Path = KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers); if (!Response.IsSuccess()) @@ -374,7 +428,7 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; } - std::string Err = Response.ErrorMessage("S3 GET failed"); + std::string Err = S3ErrorMessage("S3 GET failed", Response); ZEN_WARN("S3 GET '{}' failed: {}", Key, Err); return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } @@ -390,9 +444,17 @@ S3GetObjectResult S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize) { ZEN_ASSERT(RangeSize > 0); + + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 GET range '{}' [{}-{}] failed", Key, RangeStart, RangeStart + RangeSize - 1); + !Err.empty()) + { + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + std::string Path = KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash); Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1)); HttpClient::Response Response = m_HttpClient.Get(Path, Headers); @@ -403,7 +465,7 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; } - std::string Err = Response.ErrorMessage("S3 GET range failed"); + std::string Err = S3ErrorMessage("S3 GET range failed", Response); ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err); return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } @@ -437,14 +499,20 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran S3Result S3Client::DeleteObject(std::string_view Key) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 DELETE '{}' failed", Key); !Err.empty()) + { + return S3Result{std::move(Err)}; + } + std::string Path = KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, "", EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Delete(Path, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 DELETE failed"); + std::string Err = S3ErrorMessage("S3 DELETE failed", Response); ZEN_WARN("S3 DELETE '{}' failed: {}", Key, Err); return S3Result{std::move(Err)}; } @@ -459,6 +527,12 @@ S3Client::DeleteObject(std::string_view Key) S3Result S3Client::Touch(std::string_view Key) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 Touch '{}' failed", Key); !Err.empty()) + { + return S3Result{std::move(Err)}; + } + std::string Path = KeyToPath(Key); // x-amz-copy-source is always "/bucket/key" regardless of addressing style. @@ -469,23 +543,23 @@ S3Client::Touch(std::string_view Key) {"x-amz-metadata-directive", "REPLACE"}, }}; - HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, "", EmptyPayloadHash, ExtraSigned); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", EmptyPayloadHash, ExtraSigned); HttpClient::Response Response = m_HttpClient.Put(Path, IoBuffer{}, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 Touch failed"); + std::string Err = S3ErrorMessage("S3 Touch failed", Response); ZEN_WARN("S3 Touch '{}' failed: {}", Key, Err); return S3Result{std::move(Err)}; } // Copy operations can return HTTP 200 with an error in the XML body. std::string_view ResponseBody = Response.AsText(); - if (ResponseBody.find("<Error>") != std::string_view::npos) + std::string_view ErrorCode; + std::string_view ErrorMessage; + if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage)) { - std::string_view ErrorCode = ExtractXmlValue(ResponseBody, "Code"); - std::string_view ErrorMessage = ExtractXmlValue(ResponseBody, "Message"); - std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); + std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); ZEN_WARN("{}", Err); return S3Result{std::move(Err)}; } @@ -500,9 +574,15 @@ S3Client::Touch(std::string_view Key) S3HeadObjectResult S3Client::HeadObject(std::string_view Key) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 HEAD '{}' failed", Key); !Err.empty()) + { + return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error}; + } + std::string Path = KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest("HEAD", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "HEAD", Path, "", EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Head(Path, Headers); if (!Response.IsSuccess()) @@ -512,7 +592,7 @@ S3Client::HeadObject(std::string_view Key) return S3HeadObjectResult{{}, {}, HeadObjectResult::NotFound}; } - std::string Err = Response.ErrorMessage("S3 HEAD failed"); + std::string Err = S3ErrorMessage("S3 HEAD failed", Response); ZEN_WARN("S3 HEAD '{}' failed: {}", Key, Err); return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error}; } @@ -551,6 +631,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) for (;;) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 ListObjectsV2 prefix='{}' failed", Prefix); !Err.empty()) + { + Result.Error = std::move(Err); + return Result; + } + // Build query parameters for ListObjectsV2 std::vector<std::pair<std::string, std::string>> QueryParams; QueryParams.emplace_back("list-type", "2"); @@ -569,13 +656,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) std::string CanonicalQS = BuildCanonicalQueryString(std::move(QueryParams)); std::string RootPath = BucketRootPath(); - HttpClient::KeyValueMap Headers = SignRequest("GET", RootPath, CanonicalQS, EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", RootPath, CanonicalQS, EmptyPayloadHash); std::string FullPath = BuildRequestPath(RootPath, CanonicalQS); HttpClient::Response Response = m_HttpClient.Get(FullPath, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 ListObjectsV2 failed"); + std::string Err = S3ErrorMessage("S3 ListObjectsV2 failed", Response); ZEN_WARN("S3 ListObjectsV2 prefix='{}' failed: {}", Prefix, Err); Result.Error = std::move(Err); return Result; @@ -654,16 +741,22 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) S3CreateMultipartUploadResult S3Client::CreateMultipartUpload(std::string_view Key) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 CreateMultipartUpload '{}' failed", Key); !Err.empty()) + { + return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}}; + } + std::string Path = KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploads", ""}}); - HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, EmptyPayloadHash); std::string FullPath = BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Post(FullPath, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 CreateMultipartUpload failed"); + std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Response); ZEN_WARN("S3 CreateMultipartUpload '{}' failed: {}", Key, Err); return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}}; } @@ -693,6 +786,12 @@ S3Client::CreateMultipartUpload(std::string_view Key) S3UploadPartResult S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t PartNumber, IoBuffer Content) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 UploadPart '{}' part {} failed", Key, PartNumber); !Err.empty()) + { + return S3UploadPartResult{S3Result{std::move(Err)}, {}}; + } + std::string Path = KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({ {"partNumber", fmt::format("{}", PartNumber)}, @@ -701,13 +800,13 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize())); - HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, CanonicalQS, PayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash); std::string FullPath = BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Put(FullPath, Content, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber)); + std::string Err = S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber), Response); ZEN_WARN("S3 UploadPart '{}' part {} failed: {}", Key, PartNumber, Err); return S3UploadPartResult{S3Result{std::move(Err)}, {}}; } @@ -733,6 +832,12 @@ S3Client::CompleteMultipartUpload(std::string_view Key, std::string_view UploadId, const std::vector<std::pair<uint32_t, std::string>>& PartETags) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 CompleteMultipartUpload '{}' failed", Key); !Err.empty()) + { + return S3Result{std::move(Err)}; + } + std::string Path = KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}}); @@ -748,7 +853,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key, std::string_view XmlView = XmlBody.ToView(); std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView)); - HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, PayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash); Headers->emplace("Content-Type", "application/xml"); IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size()); @@ -757,18 +862,18 @@ S3Client::CompleteMultipartUpload(std::string_view Key, HttpClient::Response Response = m_HttpClient.Post(FullPath, Payload, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 CompleteMultipartUpload failed"); + std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Response); ZEN_WARN("S3 CompleteMultipartUpload '{}' failed: {}", Key, Err); return S3Result{std::move(Err)}; } // Check for error in response body - S3 can return 200 with an error in the XML body std::string_view ResponseBody = Response.AsText(); - if (ResponseBody.find("<Error>") != std::string_view::npos) + std::string_view ErrorCode; + std::string_view ErrorMessage; + if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage)) { - std::string_view ErrorCode = ExtractXmlValue(ResponseBody, "Code"); - std::string_view ErrorMessage = ExtractXmlValue(ResponseBody, "Message"); - std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); + std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); ZEN_WARN("{}", Err); return S3Result{std::move(Err)}; } @@ -783,16 +888,22 @@ S3Client::CompleteMultipartUpload(std::string_view Key, S3Result S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 AbortMultipartUpload '{}' failed", Key); !Err.empty()) + { + return S3Result{std::move(Err)}; + } + std::string Path = KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}}); - HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, CanonicalQS, EmptyPayloadHash); + HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, CanonicalQS, EmptyPayloadHash); std::string FullPath = BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Delete(FullPath, Headers); if (!Response.IsSuccess()) { - std::string Err = Response.ErrorMessage("S3 AbortMultipartUpload failed"); + std::string Err = S3ErrorMessage("S3 AbortMultipartUpload failed", Response); ZEN_WARN("S3 AbortMultipartUpload '{}' failed: {}", Key, Err); return S3Result{std::move(Err)}; } @@ -819,6 +930,12 @@ S3Client::GeneratePresignedPutUrl(std::string_view Key, std::chrono::seconds Exp std::string S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view Method, std::chrono::seconds ExpiresIn) { + SigV4Credentials Credentials; + if (std::string Err = RequireCredentials(Credentials, "S3 GeneratePresignedUrl '{}' {} failed", Key, Method); !Err.empty()) + { + return {}; + } + std::string Path = KeyToPath(Key); std::string Scheme = "https"; @@ -827,7 +944,6 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M Scheme = "http"; } - SigV4Credentials Credentials = GetCurrentCredentials(); return GeneratePresignedUrl(Credentials, Method, Scheme, m_Host, Path, m_Region, "s3", ExpiresIn); } diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h index 4d72dd479..1ce2a768e 100644 --- a/src/zenutil/include/zenutil/cloud/s3client.h +++ b/src/zenutil/include/zenutil/cloud/s3client.h @@ -11,6 +11,10 @@ #include <zencore/thread.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + #include <functional> #include <span> #include <string> @@ -207,7 +211,8 @@ private: /// Sign a request and return headers with Authorization, x-amz-date, x-amz-content-sha256. /// Additional x-amz-* headers that must participate in the signature are passed via /// ExtraSignedHeaders (lowercase name, value); they are also copied into the returned map. - HttpClient::KeyValueMap SignRequest(std::string_view Method, + HttpClient::KeyValueMap SignRequest(const SigV4Credentials& Credentials, + std::string_view Method, std::string_view Path, std::string_view QueryString, std::string_view PayloadHash, @@ -219,6 +224,23 @@ private: /// Get the current credentials, either from the provider or from static config SigV4Credentials GetCurrentCredentials(); + /// Populate OutCredentials and return empty string on success; on failure return a + /// "<context>: no credentials available" error (also logged). Context args are only + /// formatted on the failure path. + template<typename... ContextArgs> + std::string RequireCredentials(SigV4Credentials& OutCredentials, fmt::format_string<ContextArgs...> ContextFmt, ContextArgs&&... Args) + { + OutCredentials = GetCurrentCredentials(); + if (!OutCredentials.AccessKeyId.empty()) + { + return {}; + } + OutCredentials = {}; + return BuildNoCredentialsError(fmt::format(ContextFmt, std::forward<ContextArgs>(Args)...)); + } + + std::string BuildNoCredentialsError(std::string Context); + LoggerRef m_Log; std::string m_BucketName; std::string m_Region; |