diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-30 13:58:14 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-30 13:58:14 +0200 |
| commit | 6d75696d11aab547bb34ea22ec10fcdc594e5a44 (patch) | |
| tree | 047db726b2c4cfc05fca433561fe09f635ae88a8 /src | |
| parent | hub resource limits (#900) (diff) | |
| download | zen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.tar.xz zen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.zip | |
hub s3 hydrate improvements (#902)
- Feature: Added `--hub-hydration-target-config` option to specify the hydration target via a JSON config file (mutually exclusive with `--hub-hydration-target-spec`); supports `file` and `s3` types with structured settings
```json
{
"type": "file",
"settings": {
"path": "/path/to/hydration/storage"
}
}
```
```json
{
"type": "s3",
"settings": {
"uri": "s3://bucket[/prefix]",
"region": "us-east-1",
"endpoint": "http://localhost:9000",
"path-style": true
}
}
```
- Improvement: Hub hydration dehydration skips the `.sentry-native` directory
- Bugfix: Fixed `MakeSafeAbsolutePathInPlace` when a UNC prefix is present but path uses mixed delimiters
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/filesystem.cpp | 4 | ||||
| -rw-r--r-- | src/zens3-testbed/main.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 9 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 3 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 399 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 10 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 43 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 1 | ||||
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 28 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cloud/s3client.h | 8 |
12 files changed, 398 insertions, 117 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 0d361801f..416312cae 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -3275,11 +3275,11 @@ MakeSafeAbsolutePathInPlace(std::filesystem::path& Path) { if (!Path.empty()) { - std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred(); + Path = std::filesystem::absolute(Path).make_preferred(); #if ZEN_PLATFORM_WINDOWS const std::string_view Prefix = "\\\\?\\"; const std::u8string PrefixU8(Prefix.begin(), Prefix.end()); - std::u8string PathString = AbsolutePath.u8string(); + std::u8string PathString = Path.u8string(); if (!PathString.empty() && !PathString.starts_with(PrefixU8)) { PathString.insert(0, PrefixU8); diff --git a/src/zens3-testbed/main.cpp b/src/zens3-testbed/main.cpp index 4cd6b411f..1543c4d7c 100644 --- a/src/zens3-testbed/main.cpp +++ b/src/zens3-testbed/main.cpp @@ -110,7 +110,7 @@ CreateClient(const cxxopts::ParseResult& Args) if (Args.count("timeout")) { - Options.Timeout = std::chrono::milliseconds(Args["timeout"].as<int>() * 1000); + Options.HttpSettings.Timeout = std::chrono::milliseconds(Args["timeout"].as<int>() * 1000); } return S3Client(Options); diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index bf846d68e..76c7a8f6d 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -182,7 +182,11 @@ Hub::Hub(const Configuration& Config, , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { - if (m_Config.HydrationTargetSpecification.empty()) + if (!m_Config.HydrationTargetSpecification.empty()) + { + m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + } + else if (!m_Config.HydrationOptions) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); @@ -190,7 +194,7 @@ Hub::Hub(const Configuration& Config, } else { - m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + m_HydrationOptions = m_Config.HydrationOptions; } m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); @@ -322,6 +326,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), .HydrationTempPath = m_HydrationTempPath, .HydrationTargetSpecification = m_HydrationTargetSpecification, + .HydrationOptions = m_HydrationOptions, .HttpThreadCount = m_Config.InstanceHttpThreadCount, .CoreLimit = m_Config.InstanceCoreLimit, .ConfigPath = m_Config.InstanceConfigPath}, diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 9895f7068..ac3e680ae 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -6,6 +6,7 @@ #include "resourcemetrics.h" #include "storageserverinstance.h" +#include <zencore/compactbinary.h> #include <zencore/filesystem.h> #include <zencore/system.h> #include <zenutil/zenserverprocess.h> @@ -67,6 +68,7 @@ public: int InstanceCoreLimit = 0; // Automatic std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; + CbObject HydrationOptions; WatchDogConfiguration WatchDog; @@ -181,6 +183,7 @@ private: AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; std::string m_HydrationTargetSpecification; + CbObject m_HydrationOptions; std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 541127590..ed16bfe56 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -10,6 +10,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/system.h> +#include <zencore/timer.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> @@ -60,6 +61,7 @@ namespace { /////////////////////////////////////////////////////////////////////////// constexpr std::string_view FileHydratorPrefix = "file://"; +constexpr std::string_view FileHydratorType = "file"; struct FileHydrator : public HydrationStrategyBase { @@ -77,7 +79,21 @@ FileHydrator::Configure(const HydrationConfig& Config) { m_Config = Config; - std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()))); + std::filesystem::path ConfigPath; + if (!m_Config.TargetSpecification.empty()) + { + ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length())); + } + else + { + CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); + std::string_view Path = Settings["path"].AsString(); + if (Path.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + } + ConfigPath = Utf8ToWide(std::string(Path)); + } MakeSafeAbsolutePathInPlace(ConfigPath); if (!std::filesystem::exists(ConfigPath)) @@ -95,6 +111,8 @@ FileHydrator::Hydrate() { ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + Stopwatch Timer; + // Ensure target is clean ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); const bool ForceRemoveReadOnlyFiles = true; @@ -120,6 +138,10 @@ FileHydrator::Hydrate() ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); } + else + { + ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } } void @@ -127,6 +149,8 @@ FileHydrator::Dehydrate() { ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + Stopwatch Timer; + const std::filesystem::path TargetDir = m_StorageModuleRootDir; // Ensure target is clean. This could be replaced with an atomic copy at a later date @@ -141,7 +165,23 @@ FileHydrator::Dehydrate() try { ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); - CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true}); + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir)) + { + if (Entry.path().filename() == ".sentry-native") + { + continue; + } + std::filesystem::path Dest = TargetDir / Entry.path().filename(); + if (Entry.is_directory()) + { + CreateDirectories(Dest); + CopyTree(Entry.path(), Dest, {.EnableClone = true}); + } + else + { + CopyFile(Entry.path(), Dest, {.EnableClone = true}); + } + } } catch (std::exception& Ex) { @@ -159,11 +199,17 @@ FileHydrator::Dehydrate() ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + + if (CopySuccess) + { + ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } } /////////////////////////////////////////////////////////////////////////// constexpr std::string_view S3HydratorPrefix = "s3://"; +constexpr std::string_view S3HydratorType = "s3"; struct S3Hydrator : public HydrationStrategyBase { @@ -182,6 +228,8 @@ private: std::string m_Region; SigV4Credentials m_Credentials; Ref<ImdsCredentialProvider> m_CredentialProvider; + + static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024; }; void @@ -189,8 +237,23 @@ S3Hydrator::Configure(const HydrationConfig& Config) { m_Config = Config; - std::string_view Spec = m_Config.TargetSpecification; - Spec.remove_prefix(S3HydratorPrefix.size()); + CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); + std::string_view Spec; + if (!m_Config.TargetSpecification.empty()) + { + Spec = m_Config.TargetSpecification; + Spec.remove_prefix(S3HydratorPrefix.size()); + } + else + { + std::string_view Uri = Settings["uri"].AsString(); + if (Uri.empty()) + { + throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'"); + } + Spec = Uri; + Spec.remove_prefix(S3HydratorPrefix.size()); + } size_t SlashPos = Spec.find('/'); std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; @@ -199,7 +262,11 @@ S3Hydrator::Configure(const HydrationConfig& Config) ZEN_ASSERT(!m_Bucket.empty()); - std::string Region = GetEnvVariable("AWS_DEFAULT_REGION"); + std::string Region = std::string(Settings["region"].AsString()); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_DEFAULT_REGION"); + } if (Region.empty()) { Region = GetEnvVariable("AWS_REGION"); @@ -230,10 +297,12 @@ S3Hydrator::CreateS3Client() const Options.BucketName = m_Bucket; Options.Region = m_Region; - if (!m_Config.S3Endpoint.empty()) + CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); + std::string_view Endpoint = Settings["endpoint"].AsString(); + if (!Endpoint.empty()) { - Options.Endpoint = m_Config.S3Endpoint; - Options.PathStyle = m_Config.S3PathStyle; + Options.Endpoint = std::string(Endpoint); + Options.PathStyle = Settings["path-style"].AsBool(); } if (m_CredentialProvider) @@ -245,6 +314,8 @@ S3Hydrator::CreateS3Client() const Options.Credentials = m_Credentials; } + Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + return S3Client(Options); } @@ -275,11 +346,11 @@ S3Hydrator::Dehydrate() try { - S3Client Client = CreateS3Client(); - std::string FolderName = BuildTimestampFolderName(); - uint64_t TotalBytes = 0; - uint32_t FileCount = 0; - std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now(); + S3Client Client = CreateS3Client(); + std::string FolderName = BuildTimestampFolderName(); + uint64_t TotalBytes = 0; + uint32_t FileCount = 0; + Stopwatch Timer; DirectoryContent DirContent; GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); @@ -295,13 +366,20 @@ S3Hydrator::Dehydrate() AbsPath.string(), m_Config.ServerStateDir.string()); } + if (*RelPath.begin() == ".sentry-native") + { + continue; + } std::string Key = MakeObjectKey(FolderName, RelPath); BasicFile File(AbsPath, BasicFile::Mode::kRead); uint64_t FileSize = File.FileSize(); - S3Result UploadResult = - Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }); + S3Result UploadResult = Client.PutObjectMultipart( + Key, + FileSize, + [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }, + MultipartChunkSize); if (!UploadResult.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); @@ -312,8 +390,7 @@ S3Hydrator::Dehydrate() } // Write current-state.json - int64_t UploadDurationMs = - std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count(); + uint64_t UploadDurationMs = Timer.GetElapsedTimeMs(); UtcTime Now = UtcTime::Now(); std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", @@ -346,7 +423,7 @@ S3Hydrator::Dehydrate() throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); } - ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs); + ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs)); } catch (std::exception& Ex) { @@ -361,6 +438,7 @@ S3Hydrator::Hydrate() { ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); + Stopwatch Timer; const bool ForceRemoveReadOnlyFiles = true; // Clean temp dir before starting in case of leftover state from a previous failed hydration @@ -374,19 +452,17 @@ S3Hydrator::Hydrate() S3Client Client = CreateS3Client(); std::string MetaKey = m_KeyPrefix + "/current-state.json"; - S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey); - if (HeadResult.Status == HeadObjectResult::NotFound) - { - throw zen::runtime_error("No state found in S3 at '{}'", MetaKey); - } - if (!HeadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error); - } - S3GetObjectResult MetaResult = Client.GetObject(MetaKey); if (!MetaResult.IsSuccess()) { + if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText) + { + ZEN_INFO("No state found in S3 at {}", MetaKey); + + ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + return; + } throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); } @@ -426,17 +502,17 @@ S3Hydrator::Hydrate() std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); CreateDirectories(DestPath.parent_path()); - BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); - DestFile.SetFileSize(Obj.Size); - - if (Obj.Size > 0) + if (Obj.Size > MultipartChunkSize) { + BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); + DestFile.SetFileSize(Obj.Size); + BasicFileWriter Writer(DestFile, 64 * 1024); uint64_t Offset = 0; while (Offset < Obj.Size) { - uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset); + uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset); S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { @@ -453,6 +529,34 @@ S3Hydrator::Hydrate() Writer.Flush(); } + else + { + S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error); + } + + if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + { + std::error_code Ec; + std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); + if (Ec) + { + WriteFile(DestPath, Chunk.Content); + } + else + { + Chunk.Content.SetDeleteOnClose(false); + Chunk.Content = {}; + RenameFile(ChunkPath, DestPath, Ec); + } + } + else + { + WriteFile(DestPath, Chunk.Content); + } + } } // Downloaded successfully - swap into ServerStateDir @@ -465,19 +569,20 @@ S3Hydrator::Hydrate() std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); if (ItTmp != m_Config.TempDir.begin()) { - // Fast path: atomic renames - no data copying needed - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir)) + DirectoryContent DirContent; + GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); + + for (const std::filesystem::path& AbsPath : DirContent.Directories) { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename()); - if (Entry.is_directory()) - { - RenameDirectory(Entry.path(), Dest); - } - else - { - RenameFile(Entry.path(), Dest); - } + std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); + RenameDirectory(AbsPath, Dest); + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); + RenameFile(AbsPath, Dest); } + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); } @@ -491,7 +596,7 @@ S3Hydrator::Hydrate() CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); } - ZEN_INFO("Hydration complete from folder '{}'", FolderName); + ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } catch (std::exception& Ex) { @@ -513,19 +618,41 @@ S3Hydrator::Hydrate() std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) { - if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) + if (!Config.TargetSpecification.empty()) + { + if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) + { + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); + Hydrator->Configure(Config); + return Hydrator; + } + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + { + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); + Hydrator->Configure(Config); + return Hydrator; + } + throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); + } + + std::string_view Type = Config.Options["type"].AsString(); + if (Type == FileHydratorType) { std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); Hydrator->Configure(Config); return Hydrator; } - if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + if (Type == S3HydratorType) { std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); Hydrator->Configure(Config); return Hydrator; } - throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); + if (!Type.empty()) + { + throw zen::runtime_error("Unknown hydration target type '{}'", Type); + } + throw zen::runtime_error("No hydration target configured"); } #if ZEN_WITH_TESTS @@ -607,6 +734,12 @@ namespace { AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); + AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); + AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); return Files; } @@ -844,12 +977,16 @@ TEST_CASE("hydration.s3.dehydrate_hydrate") auto TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = ModuleId; + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); // Dehydrate: upload server state to MinIO { @@ -902,12 +1039,18 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") const std::string ModuleId = "s3test_folder_select"; HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = ModuleId; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } // v1: dehydrate without a marker file CreateTestTree(ServerStateDir); @@ -972,13 +1115,19 @@ TEST_CASE("hydration.s3.module_isolation") CreateDirectories(TempPath); ModuleData Data; - Data.Config.ServerStateDir = StateDir; - Data.Config.TempDir = TempPath; - Data.Config.ModuleId = ModuleId; - Data.Config.TargetSpecification = "s3://zen-hydration-test"; - Data.Config.S3Endpoint = Minio.Endpoint(); - Data.Config.S3PathStyle = true; - Data.Files = CreateTestTree(StateDir); + Data.Config.ServerStateDir = StateDir; + Data.Config.TempDir = TempPath; + Data.Config.ModuleId = ModuleId; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Data.Config.Options = std::move(Root).AsObject(); + } + Data.Files = CreateTestTree(StateDir); std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config); Hydrator->Dehydrate(); @@ -1015,7 +1164,8 @@ TEST_CASE("hydration.s3.concurrent") ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - constexpr int kModuleCount = 4; + constexpr int kModuleCount = 16; + constexpr int kThreadCount = 4; ScopedTemporaryDirectory TempDir; @@ -1034,18 +1184,24 @@ TEST_CASE("hydration.s3.concurrent") CreateDirectories(StateDir); CreateDirectories(TempPath); - Modules[I].Config.ServerStateDir = StateDir; - Modules[I].Config.TempDir = TempPath; - Modules[I].Config.ModuleId = ModuleId; - Modules[I].Config.TargetSpecification = "s3://zen-hydration-test"; - Modules[I].Config.S3Endpoint = Minio.Endpoint(); - Modules[I].Config.S3PathStyle = true; - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Modules[I].Config.Options = std::move(Root).AsObject(); + } + Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { - WorkerThreadPool Pool(kModuleCount, "hydration_s3_dehy"); + WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); @@ -1063,7 +1219,7 @@ TEST_CASE("hydration.s3.concurrent") // Concurrent hydrate { - WorkerThreadPool Pool(kModuleCount, "hydration_s3_hy"); + WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); @@ -1116,12 +1272,18 @@ TEST_CASE("hydration.s3.no_prior_state") WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_no_prior"; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = "s3test_no_prior"; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); @@ -1159,12 +1321,71 @@ TEST_CASE("hydration.s3.path_prefix") std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_prefix"; - Config.TargetSpecification = "s3://zen-hydration-test/team/project"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = "s3test_prefix"; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } + + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(); + } + + CleanDirectory(ServerStateDir, true); + + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Hydrate(); + } + + VerifyTree(ServerStateDir, TestFiles); +} + +TEST_CASE("hydration.s3.options_region_override") +{ + // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var. + // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. + + MinioProcessOptions MinioOpts; + MinioOpts.Port = 19016; + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); + + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + auto TestFiles = CreateTestTree(ServerStateDir); + + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = "s3test_region_override"; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index d29ffe5c0..19a96c248 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -2,6 +2,8 @@ #pragma once +#include <zencore/compactbinary.h> + #include <filesystem> namespace zen { @@ -16,12 +18,8 @@ struct HydrationConfig std::string ModuleId; // Back-end specific target specification (e.g. S3 bucket, file path, etc) std::string TargetSpecification; - - // Optional S3 endpoint override (e.g. "http://localhost:9000" for MinIO). - std::string S3Endpoint; - // Use path-style S3 URLs (endpoint/bucket/key) instead of virtual-hosted-style - // (bucket.endpoint/key). Required for MinIO and other non-AWS endpoints. - bool S3PathStyle = false; + // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification) + CbObject Options; }; /** diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 802606f6a..0c9354990 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -157,7 +157,8 @@ StorageServerInstance::Hydrate() HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, - .TargetSpecification = m_Config.HydrationTargetSpecification}; + .TargetSpecification = m_Config.HydrationTargetSpecification, + .Options = m_Config.HydrationOptions}; std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); @@ -170,7 +171,8 @@ StorageServerInstance::Dehydrate() HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, - .TargetSpecification = m_Config.HydrationTargetSpecification}; + .TargetSpecification = m_Config.HydrationTargetSpecification, + .Options = m_Config.HydrationOptions}; std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 33646c375..1b0078d87 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/compactbinary.h> #include <zenutil/zenserverprocess.h> #include <atomic> @@ -24,6 +25,7 @@ public: uint16_t BasePort; std::filesystem::path HydrationTempPath; std::string HydrationTargetSpecification; + CbObject HydrationOptions; uint32_t HttpThreadCount = 0; // Automatic int CoreLimit = 0; // Automatic std::filesystem::path ConfigPath; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 2d0d5398b..499586abc 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -6,8 +6,10 @@ #include "httphubservice.h" #include "hub.h" +#include <zencore/compactbinary.h> #include <zencore/config.h> #include <zencore/except.h> +#include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/memory/llm.h> @@ -142,6 +144,14 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HydrationTargetSpecification), "<hydration-target-spec>"); + Options.add_option("hub", + "", + "hub-hydration-target-config", + "Path to JSON file specifying the hydration target (mutually exclusive with " + "--hub-hydration-target-spec). Supported types: 'file', 's3'.", + cxxopts::value(m_ServerOptions.HydrationTargetConfigPath), + "<path>"); + #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", @@ -269,6 +279,16 @@ ZenHubServerConfigurator::ValidateOptions() m_ServerOptions.HubProvisionMemoryLimitPercent), {}); } + if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty()) + { + throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {}); + } + if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath)) + { + throw OptionParseException( + fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()), + {}); + } } /////////////////////////////////////////////////////////////////////////// @@ -479,6 +499,29 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) }, .ResourceLimits = ResolveLimits(ServerConfig)}; + if (!ServerConfig.HydrationTargetConfigPath.empty()) + { + FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath); + if (!Contents) + { + throw zen::runtime_error("Failed to read hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + Contents.ErrorCode.message()); + } + IoBuffer Buffer(Contents.Flatten()); + std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize()); + + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError); + if (!ParseError.empty() || !Root.IsObject()) + { + throw zen::runtime_error("Failed to parse hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + ParseError.empty() ? "root must be a JSON object" : ParseError); + } + HubConfig.HydrationOptions = std::move(Root).AsObject(); + } + m_Hub = std::make_unique<Hub>( std::move(HubConfig), ZenServerEnvironment(ZenServerEnvironment::Hub, diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 9660e9a49..b976c52b3 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -49,6 +49,7 @@ struct ZenHubServerConfig : public ZenServerConfig int HubInstanceCoreLimit = 0; // Automatic std::filesystem::path HubInstanceConfigPath; // Path to Lua config file std::string HydrationTargetSpecification; // hydration/dehydration target specification + std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification) ZenHubWatchdogConfig WatchdogConfig; uint64_t HubProvisionDiskLimitBytes = 0; uint32_t HubProvisionDiskLimitPercent = 0; diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index c3404699d..d9fde05d9 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -137,6 +137,8 @@ namespace { } // namespace +std::string_view S3GetObjectResult::NotFoundErrorText = "Not found"; + S3Client::S3Client(const S3ClientOptions& Options) : m_Log(logging::Get("s3")) , m_BucketName(Options.BucketName) @@ -145,13 +147,7 @@ S3Client::S3Client(const S3ClientOptions& Options) , m_PathStyle(Options.PathStyle) , m_Credentials(Options.Credentials) , m_CredentialProvider(Options.CredentialProvider) -, m_HttpClient(BuildEndpoint(), - HttpClientSettings{ - .LogCategory = "s3", - .ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .RetryCount = Options.RetryCount, - }) +, m_HttpClient(BuildEndpoint(), Options.HttpSettings) { m_Host = BuildHostHeader(); ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})", @@ -347,15 +343,20 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content) } S3GetObjectResult -S3Client::GetObject(std::string_view Key) +S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath) { std::string Path = KeyToPath(Key); HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); - HttpClient::Response Response = m_HttpClient.Get(Path, Headers); + HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers); if (!Response.IsSuccess()) { + if (Response.StatusCode == HttpResponseCode::NotFound) + { + return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; + } + std::string Err = Response.ErrorMessage("S3 GET failed"); ZEN_WARN("S3 GET '{}' failed: {}", Key, Err); return S3GetObjectResult{S3Result{std::move(Err)}, {}}; @@ -377,6 +378,11 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran HttpClient::Response Response = m_HttpClient.Get(Path, Headers); if (!Response.IsSuccess()) { + if (Response.StatusCode == HttpResponseCode::NotFound) + { + return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; + } + std::string Err = Response.ErrorMessage("S3 GET range failed"); ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err); return S3GetObjectResult{S3Result{std::move(Err)}, {}}; @@ -749,7 +755,7 @@ S3Client::PutObjectMultipart(std::string_view Key, return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{}); } - ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); + ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key); if (!InitResult) @@ -797,7 +803,7 @@ S3Client::PutObjectMultipart(std::string_view Key, throw; } - ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); + ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); return {}; } diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h index bd30aa8a2..f1f0df0e4 100644 --- a/src/zenutil/include/zenutil/cloud/s3client.h +++ b/src/zenutil/include/zenutil/cloud/s3client.h @@ -35,9 +35,7 @@ struct S3ClientOptions /// Overrides the static Credentials field. Ref<ImdsCredentialProvider> CredentialProvider; - std::chrono::milliseconds ConnectTimeout{5000}; - std::chrono::milliseconds Timeout{}; - uint8_t RetryCount = 3; + HttpClientSettings HttpSettings = {.LogCategory = "s3", .ConnectTimeout = std::chrono::milliseconds(5000), .RetryCount = 3}; }; struct S3ObjectInfo @@ -70,6 +68,8 @@ struct S3GetObjectResult : S3Result IoBuffer Content; std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); } + + static std::string_view NotFoundErrorText; }; /// Result of HeadObject - carries object metadata and existence status. @@ -119,7 +119,7 @@ public: S3Result PutObject(std::string_view Key, IoBuffer Content); /// Download an object from S3 - S3GetObjectResult GetObject(std::string_view Key); + S3GetObjectResult GetObject(std::string_view Key, const std::filesystem::path& TempFilePath = {}); /// Download a byte range of an object from S3 /// @param RangeStart First byte offset (inclusive) |