aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-21 16:26:57 +0200
committerGitHub Enterprise <[email protected]>2026-04-21 16:26:57 +0200
commit6b59d3d37dcc6320929df2f0074f9a1cb506d1fd (patch)
tree2ddd317e381c29a97c666e9d72cf5d614a13f6f8 /src
parentzen CLI security review fixes (#974) (diff)
downloadarchived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.tar.xz
archived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.zip
improved s3 hydration (#997)
- Improvement: Hub shares a single S3 client and IMDS credential provider across all modules, reducing IMDS load and surviving transient IMDS blips during bulk provisioning - Improvement: Hub validates hydration config at startup; bad `--hub-hydration-target-spec` or `--hub-hydration-target-config` now fails `zen hub` at boot instead of per-module at first hydrate - Improvement: S3 hydration multipart chunk size configurable via `settings.chunk-size` (default 32 MiB) - Improvement: S3 client extracts `<Error><Code>` and `<Message>` from XML error bodies (previously logged as `<unhandled content format>`) - Improvement: S3 client fails fast with a "no credentials available" error when AWS credentials are missing, instead of sending an unsigned request that S3 rejects with a generic 400 - Improvement: IMDS credential provider retries transient connection failures (up to 3 attempts with backoff) - Improvement: HTTP clients with `RetryCount > 0` also retry on `CURLE_COULDNT_CONNECT`
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/clients/httpclientcurl.cpp1
-rw-r--r--src/zenhttp/httpclient.cpp1
-rw-r--r--src/zenserver/hub/hub.cpp41
-rw-r--r--src/zenserver/hub/hub.h5
-rw-r--r--src/zenserver/hub/hydration.cpp2137
-rw-r--r--src/zenserver/hub/hydration.h49
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp20
-rw-r--r--src/zenserver/hub/storageserverinstance.h8
-rw-r--r--src/zenutil/cloud/imdscredentials.cpp1
-rw-r--r--src/zenutil/cloud/s3client.cpp184
-rw-r--r--src/zenutil/include/zenutil/cloud/s3client.h24
11 files changed, 1287 insertions, 1184 deletions
diff --git a/src/zenhttp/clients/httpclientcurl.cpp b/src/zenhttp/clients/httpclientcurl.cpp
index ddf6cb58f..eee80c269 100644
--- a/src/zenhttp/clients/httpclientcurl.cpp
+++ b/src/zenhttp/clients/httpclientcurl.cpp
@@ -448,6 +448,7 @@ CurlHttpClient::ShouldRetry(const CurlResult& Result)
{
case CURLE_OK:
break;
+ case CURLE_COULDNT_CONNECT:
case CURLE_RECV_ERROR:
case CURLE_SEND_ERROR:
case CURLE_OPERATION_TIMEDOUT:
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 3da8a9220..9d5846f71 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -305,6 +305,7 @@ HttpClient::Response::ToText() const
case ZenContentType::kJavaScript:
case ZenContentType::kJSON:
case ZenContentType::kText:
+ case ZenContentType::kXML:
case ZenContentType::kYAML:
return std::string{AsText()};
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 128d3ed35..3fbb7a26e 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -183,20 +183,22 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy
Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
"Provision and hydration worker pools must be distinct to avoid deadlocks");
+ HydrationBase::Configuration HydrationConfig;
if (!m_Config.HydrationTargetSpecification.empty())
{
- m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification;
}
else if (!m_Config.HydrationOptions)
{
std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
ZEN_INFO("using file hydration path: '{}'", FileHydrationPath);
- m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
+ HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
}
else
{
- m_HydrationOptions = m_Config.HydrationOptions;
+ HydrationConfig.Options = m_Config.HydrationOptions;
}
+ m_Hydration = InitHydration(HydrationConfig);
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
@@ -324,19 +326,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
- .TempDir = m_HydrationTempPath / ModuleId,
- .HydrationTargetSpecification = m_HydrationTargetSpecification,
- .HydrationOptions = m_HydrationOptions,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath,
- .Malloc = m_Config.InstanceMalloc,
- .Trace = m_Config.InstanceTrace,
- .TraceHost = m_Config.InstanceTraceHost,
- .TraceFile = m_Config.InstanceTraceFile,
- .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ *m_Hydration,
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -1216,11 +1217,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId)
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
- HydrationConfig Config{.ServerStateDir = ServerStateDir,
- .TempDir = TempDir,
- .ModuleId = std::string(ModuleId),
- .TargetSpecification = m_HydrationTargetSpecification,
- .Options = m_HydrationOptions};
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)};
if (m_Config.OptionalHydrationWorkerPool)
{
Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool,
@@ -1228,7 +1225,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId)
.PauseFlag = &PauseFlag});
}
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
Hydrator->Obliterate();
}
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 040f34af5..40d046ce0 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -199,9 +199,8 @@ private:
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
- std::string m_HydrationTargetSpecification;
- CbObject m_HydrationOptions;
- std::filesystem::path m_HydrationTempPath;
+ std::unique_ptr<HydrationBase> m_Hydration;
+ std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 266296fa9..4b2294b49 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -79,15 +79,14 @@ namespace hydration_impl {
CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
}
+ ///////////////////////////////////////////////////////////////////////
+ // Per-module storage interface driven by IncrementalHydrator.
+
class StorageBase
{
public:
- virtual ~StorageBase() {}
+ virtual ~StorageBase() = default;
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options) = 0;
virtual void SaveMetadata(const CbObject& Data) = 0;
virtual CbObject LoadMetadata() = 0;
virtual CbObject GetSettings() = 0;
@@ -107,141 +106,31 @@ namespace hydration_impl {
virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
};
- constexpr std::string_view FileHydratorPrefix = "file://";
- constexpr std::string_view FileHydratorType = "file";
-
- constexpr std::string_view S3HydratorPrefix = "s3://";
- constexpr std::string_view S3HydratorType = "s3";
-
class FileStorage : public StorageBase
{
public:
- FileStorage() {}
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options)
- {
- ZEN_UNUSED(TempDir);
- if (!TargetSpecification.empty())
- {
- m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length()));
- if (m_StoragePath.empty())
- {
- throw zen::runtime_error("Hydration config 'file' type requires a directory path");
- }
- }
- else
- {
- CbObjectView Settings = Options["settings"].AsObjectView();
- std::string_view Path = Settings["path"].AsString();
- if (Path.empty())
- {
- throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
- }
- m_StoragePath = Utf8ToWide(std::string(Path));
- }
- m_StoragePath = m_StoragePath / ModuleId;
- MakeSafeAbsolutePathInPlace(m_StoragePath);
-
- m_StatePathName = m_StoragePath / "current-state.cbo";
- m_CASPath = m_StoragePath / "cas";
- CreateDirectories(m_CASPath);
- }
- virtual void SaveMetadata(const CbObject& Data)
- {
- BinaryWriter Output;
- SaveCompactBinary(Output, Data);
- WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
- }
- virtual CbObject LoadMetadata()
- {
- if (!IsFile(m_StatePathName))
- {
- return {};
- }
- FileContents Content = ReadFile(m_StatePathName);
- if (Content.ErrorCode)
- {
- ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
- }
- IoBuffer Payload = Content.Flatten();
- CbValidateError Error;
- CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
- if (Error != CbValidateError::None)
- {
- throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
- }
- return Result;
- }
-
- virtual CbObject GetSettings() override { return {}; }
- virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); }
-
- virtual std::vector<IoHash> List()
- {
- DirectoryContent DirContent;
- GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
- std::vector<IoHash> Result;
- Result.reserve(DirContent.Files.size());
- for (const std::filesystem::path& Path : DirContent.Files)
- {
- IoHash Hash;
- if (IoHash::TryParse(Path.filename().string(), Hash))
- {
- Result.push_back(Hash);
- }
- }
- return Result;
- }
-
- virtual void Put(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& SourcePath)
- {
- ZEN_UNUSED(Size);
- Work.ScheduleWork(WorkerPool,
- [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag.load())
- {
- CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true});
- }
- });
- }
+ static constexpr std::string_view Prefix = "file://";
+ static constexpr std::string_view Type = "file";
- virtual void Get(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& DestinationPath)
- {
- ZEN_UNUSED(Size);
- Work.ScheduleWork(
- WorkerPool,
- [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag.load())
- {
- CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true});
- }
- });
- }
+ explicit FileStorage(std::filesystem::path ModulePath);
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override
- {
- // Local filesystem backend does not use modification-time-based retention, so no refresh is needed.
- ZEN_UNUSED(Work);
- ZEN_UNUSED(WorkerPool);
- ZEN_UNUSED(Hash);
- }
-
- virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
- {
- ZEN_UNUSED(Work);
- ZEN_UNUSED(WorkerPool);
- DeleteDirectories(m_StoragePath);
- }
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override { return {}; }
+ virtual void ParseSettings(const CbObjectView&) override {}
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {}
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
private:
std::filesystem::path m_StoragePath;
@@ -252,876 +141,1058 @@ namespace hydration_impl {
class S3Storage : public StorageBase
{
public:
- S3Storage() {}
+ static constexpr std::string_view Prefix = "s3://";
+ static constexpr std::string_view Type = "s3";
+ static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options)
- {
- m_Options = Options;
+ S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
- CbObjectView Settings = m_Options["settings"].AsObjectView();
- std::string_view Spec;
- if (!TargetSpecification.empty())
- {
- Spec = TargetSpecification;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
- else
- {
- std::string_view Uri = Settings["uri"].AsString();
- if (Uri.empty())
- {
- throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
- }
- Spec = Uri;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override;
+ virtual void ParseSettings(const CbObjectView& Settings) override;
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
- size_t SlashPos = Spec.find('/');
- std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
- m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
- m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId);
+ private:
+ S3Client& m_Client;
+ std::string m_KeyPrefix;
+ std::filesystem::path m_TempDir;
+ uint64_t m_MultipartChunkSize;
+ };
- ZEN_ASSERT(!m_Bucket.empty());
+ ///////////////////////////////////////////////////////////////////////
+ // FileStorage implementations
- std::string Region = std::string(Settings["region"].AsString());
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_DEFAULT_REGION");
- }
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_REGION");
- }
- if (Region.empty())
- {
- Region = "us-east-1";
- }
- m_Region = std::move(Region);
+ FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath))
+ {
+ MakeSafeAbsolutePathInPlace(m_StoragePath);
+ m_StatePathName = m_StoragePath / "current-state.cbo";
+ m_CASPath = m_StoragePath / "cas";
+ CreateDirectories(m_CASPath);
+ }
- std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
- if (AccessKeyId.empty())
- {
- m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
- }
- else
- {
- m_Credentials.AccessKeyId = std::move(AccessKeyId);
- m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
- m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
- }
- m_TempDir = TempDir;
- m_Client = CreateS3Client();
- }
+ void FileStorage::SaveMetadata(const CbObject& Data)
+ {
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
+ }
- virtual void SaveMetadata(const CbObject& Data)
+ CbObject FileStorage::LoadMetadata()
+ {
+ if (!IsFile(m_StatePathName))
+ {
+ return {};
+ }
+ FileContents Content = ReadFile(m_StatePathName);
+ if (Content.ErrorCode)
+ {
+ ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
+ }
+ IoBuffer Payload = Content.Flatten();
+ CbValidateError Error;
+ CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
+ if (Error != CbValidateError::None)
{
- S3Client& Client = *m_Client;
- BinaryWriter Output;
- SaveCompactBinary(Output, Data);
- IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ }
+ return Result;
+ }
- std::string Key = m_KeyPrefix + "/incremental-state.cbo";
- S3Result Result = Client.PutObject(Key, std::move(Payload));
- if (!Result.IsSuccess())
+ std::vector<IoHash> FileStorage::List()
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
+ std::vector<IoHash> Result;
+ Result.reserve(DirContent.Files.size());
+ for (const std::filesystem::path& Path : DirContent.Files)
+ {
+ IoHash Hash;
+ if (IoHash::TryParse(Path.filename().string(), Hash))
{
- throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ Result.push_back(Hash);
}
}
+ return Result;
+ }
- virtual CbObject LoadMetadata()
- {
- S3Client& Client = *m_Client;
- std::string Key = m_KeyPrefix + "/incremental-state.cbo";
- S3GetObjectResult Result = Client.GetObject(Key);
- if (!Result.IsSuccess())
- {
- if (Result.Error == S3GetObjectResult::NotFoundErrorText)
+ void FileStorage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true});
+ }
+ });
+ }
+
+ void FileStorage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
{
- return {};
+ CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true});
}
- throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
- }
+ });
+ }
+
+ void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
+ {
+ ZEN_UNUSED(Work);
+ ZEN_UNUSED(WorkerPool);
+ DeleteDirectories(m_StoragePath);
+ }
- CbValidateError Error;
- CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
- if (Error != CbValidateError::None)
+ ///////////////////////////////////////////////////////////////////////
+ // S3Storage implementations
+
+ S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize)
+ : m_Client(Client)
+ , m_KeyPrefix(std::move(KeyPrefix))
+ , m_TempDir(std::move(TempDir))
+ , m_MultipartChunkSize(MultipartChunkSize)
+ {
+ }
+
+ void S3Storage::SaveMetadata(const CbObject& Data)
+ {
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3Result Result = m_Client.PutObject(Key, std::move(Payload));
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ }
+ }
+
+ CbObject S3Storage::LoadMetadata()
+ {
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3GetObjectResult Result = m_Client.GetObject(Key);
+ if (!Result.IsSuccess())
+ {
+ if (Result.Error == S3GetObjectResult::NotFoundErrorText)
{
- throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
+ return {};
}
- return Meta;
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
}
- virtual CbObject GetSettings() override
+ CbValidateError Error;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
+ if (Error != CbValidateError::None)
{
- CbObjectWriter Writer;
- Writer << "MultipartChunkSize" << m_MultipartChunkSize;
- return Writer.Save();
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
}
+ return Meta;
+ }
- virtual void ParseSettings(const CbObjectView& Settings)
+ CbObject S3Storage::GetSettings()
+ {
+ CbObjectWriter Writer;
+ Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ return Writer.Save();
+ }
+
+ void S3Storage::ParseSettings(const CbObjectView& Settings)
+ {
+ m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ }
+
+ std::vector<IoHash> S3Storage::List()
+ {
+ std::string CasPrefix = m_KeyPrefix + "/cas/";
+ S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix);
+ if (!Result.IsSuccess())
{
- m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize);
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error);
}
- virtual std::vector<IoHash> List()
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Result.Objects.size());
+ for (const S3ObjectInfo& Obj : Result.Objects)
{
- S3Client& Client = *m_Client;
- std::string Prefix = m_KeyPrefix + "/cas/";
- S3ListObjectsResult Result = Client.ListObjects(Prefix);
- if (!Result.IsSuccess())
+ size_t LastSlash = Obj.Key.rfind('/');
+ if (LastSlash == std::string::npos)
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error);
+ continue;
}
-
- std::vector<IoHash> Hashes;
- Hashes.reserve(Result.Objects.size());
- for (const S3ObjectInfo& Obj : Result.Objects)
+ IoHash Hash;
+ if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
{
- size_t LastSlash = Obj.Key.rfind('/');
- if (LastSlash == std::string::npos)
- {
- continue;
- }
- IoHash Hash;
- if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
- {
- Hashes.push_back(Hash);
- }
+ Hashes.push_back(Hash);
}
- return Hashes;
}
+ return Hashes;
+ }
- virtual void Put(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& SourcePath)
- {
- Work.ScheduleWork(
- WorkerPool,
- [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- S3Client& Client = *m_Client;
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ void S3Storage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObjectMultipart(
+ Key,
+ Size,
+ [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
+ m_MultipartChunkSize);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ else
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObject(Key, File.ReadAll());
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ });
+ }
- if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObjectMultipart(
- Key,
- Size,
- [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
- m_MultipartChunkSize);
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- else
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObject(Key, File.ReadAll());
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- });
- }
+ void S3Storage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- virtual void Get(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& DestinationPath)
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
-
- if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ class WorkData
{
- class WorkData
- {
- public:
- WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
- {
- PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
- }
- ~WorkData() { m_DestFile.Flush(); }
- void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
-
- private:
- BasicFile m_DestFile;
- };
-
- std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
-
- uint64_t Offset = 0;
- while (Offset < Size)
+ public:
+ WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
{
- uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
-
- Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
- Key,
- Offset,
- Offset + ChunkSize - 1,
- Chunk.Error);
- }
-
- Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
- });
- Offset += ChunkSize;
+ PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
}
- }
- else
- {
- Work.ScheduleWork(
- WorkerPool,
- [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
- }
+ ~WorkData() { m_DestFile.Flush(); }
+ void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
- if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
- {
- std::error_code Ec;
- std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (Ec)
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- else
- {
- Chunk.Content.SetDeleteOnClose(false);
- Chunk.Content = {};
- RenameFile(ChunkPath, DestinationPath, Ec);
- if (Ec)
- {
- Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
- Chunk.Content.SetDeleteOnClose(true);
- WriteFile(DestinationPath, Chunk.Content);
- }
- }
- }
- else
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- });
- }
- }
+ private:
+ BasicFile m_DestFile;
+ };
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override
- {
- Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- S3Result Result = m_Client->Touch(Key);
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
- }
- });
- }
+ std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
- virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
- {
- std::string Prefix = m_KeyPrefix + "/";
- S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix);
- if (!ListResult.IsSuccess())
+ uint64_t Offset = 0;
+ while (Offset < Size)
{
- throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error);
- }
- for (const S3ObjectInfo& Obj : ListResult.Objects)
- {
- Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
+ uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
+
+ Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
return;
}
- S3Result DelResult = m_Client->DeleteObject(Key);
- if (!DelResult.IsSuccess())
+ S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
}
+
+ Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
});
+ Offset += ChunkSize;
}
}
-
- private:
- std::unique_ptr<S3Client> CreateS3Client() const
+ else
{
- S3ClientOptions Options;
- Options.BucketName = m_Bucket;
- Options.Region = m_Region;
+ Work.ScheduleWork(WorkerPool,
+ [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ }
- CbObjectView Settings = m_Options["settings"].AsObjectView();
- std::string_view Endpoint = Settings["endpoint"].AsString();
- if (!Endpoint.empty())
- {
- Options.Endpoint = std::string(Endpoint);
- Options.PathStyle = Settings["path-style"].AsBool();
- }
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestinationPath, Ec);
+ if (Ec)
+ {
+ Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
+ Chunk.Content.SetDeleteOnClose(true);
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ }
+ }
+ else
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ });
+ }
+ }
- if (m_CredentialProvider)
+ void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
{
- Options.CredentialProvider = m_CredentialProvider;
+ return;
}
- else
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ S3Result Result = m_Client.Touch(Key);
+ if (!Result.IsSuccess())
{
- Options.Credentials = m_Credentials;
+ throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
}
+ });
+ }
- Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
-
- return std::make_unique<S3Client>(Options);
+ void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
+ {
+ std::string ModulePrefix = m_KeyPrefix + "/";
+ S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error);
}
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Result DelResult = m_Client.DeleteObject(Key);
+ if (!DelResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ }
+ });
+ }
+ }
- static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
-
- std::string m_KeyPrefix;
- CbObject m_Options;
- std::string m_Bucket;
- std::string m_Region;
- SigV4Credentials m_Credentials;
- Ref<ImdsCredentialProvider> m_CredentialProvider;
- std::unique_ptr<S3Client> m_Client;
- std::filesystem::path m_TempDir;
- uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize;
- };
-
-} // namespace hydration_impl
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator: the only HydrationStrategyBase implementation.
+ // Holds a per-module StorageBase and threading context; drives the
+ // hydrate/dehydrate algorithm.
-using namespace hydration_impl;
+ class IncrementalHydrator : public HydrationStrategyBase
+ {
+ public:
+ IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage);
+ virtual ~IncrementalHydrator() override;
-///////////////////////////////////////////////////////////////////////////
+ virtual void Dehydrate(const CbObject& CachedState) override;
+ virtual CbObject Hydrate() override;
+ virtual void Obliterate() override;
-class IncrementalHydrator : public HydrationStrategyBase
-{
-public:
- IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage);
- virtual ~IncrementalHydrator() override;
- virtual void Configure(const HydrationConfig& Config) override;
- virtual void Dehydrate(const CbObject& CachedState) override;
- virtual CbObject Hydrate() override;
- virtual void Obliterate() override;
+ private:
+ struct Entry
+ {
+ std::filesystem::path RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ };
-private:
- struct Entry
- {
- std::filesystem::path RelativePath;
- uint64_t Size;
- uint64_t ModTick;
- IoHash Hash;
+ std::unique_ptr<StorageBase> m_Storage;
+ HydrationConfig m_Config;
+ WorkerThreadPool m_FallbackWorkPool;
+ std::atomic<bool> m_FallbackAbortFlag{false};
+ std::atomic<bool> m_FallbackPauseFlag{false};
+ HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
+ .AbortFlag = &m_FallbackAbortFlag,
+ .PauseFlag = &m_FallbackPauseFlag};
};
- std::unique_ptr<StorageBase> m_Storage;
- HydrationConfig m_Config;
- WorkerThreadPool m_FallbackWorkPool;
- std::atomic<bool> m_FallbackAbortFlag{false};
- std::atomic<bool> m_FallbackPauseFlag{false};
- HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
- .AbortFlag = &m_FallbackAbortFlag,
- .PauseFlag = &m_FallbackPauseFlag};
-};
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator implementations
-IncrementalHydrator::IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0)
-{
-}
-
-IncrementalHydrator::~IncrementalHydrator()
-{
- m_Storage.reset();
-}
-
-void
-IncrementalHydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
- m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options);
- if (Config.Threading)
+ IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage)
+ : m_Storage(std::move(Storage))
+ , m_Config(Config)
+ , m_FallbackWorkPool(0)
{
- m_Threading = *Config.Threading;
+ if (Config.Threading)
+ {
+ m_Threading = *Config.Threading;
+ }
}
-}
-void
-IncrementalHydrator::Dehydrate(const CbObject& CachedState)
-{
- Stopwatch Timer;
+ IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); }
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- try
+ void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
- std::unordered_map<std::string, size_t> StateEntryLookup;
- std::vector<Entry> StateEntries;
- for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- std::filesystem::path RelativePath(EntryView["Path"].AsString());
- uint64_t Size = EntryView["Size"].AsUInt64();
- uint64_t ModTick = EntryView["ModTick"].AsUInt64();
- IoHash Hash = EntryView["Hash"].AsHash();
-
- StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
- StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
- }
+ Stopwatch Timer;
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ try
+ {
+ std::unordered_map<std::string, size_t> StateEntryLookup;
+ std::vector<Entry> StateEntries;
+ for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::filesystem::path RelativePath(EntryView["Path"].AsString());
+ uint64_t Size = EntryView["Size"].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"].AsUInt64();
+ IoHash Hash = EntryView["Hash"].AsHash();
+
+ StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
+ StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
- ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- DirContent.Files.size(),
- NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
- std::vector<Entry> Entries;
- Entries.resize(DirContent.Files.size());
+ ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DirContent.Files.size(),
+ NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
- uint64_t HashedFiles = 0;
- uint64_t HashedBytes = 0;
+ std::vector<Entry> Entries;
+ Entries.resize(DirContent.Files.size());
- std::unordered_set<IoHash> ExistsLookup;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ uint64_t HashedFiles = 0;
+ uint64_t HashedBytes = 0;
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unordered_set<IoHash> ExistsLookup;
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
- if (AbsPath.filename() == "reserve.gc")
- {
- continue;
- }
- const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
- if (*RelativePath.begin() == ".sentry-native")
- {
- continue;
- }
- if (RelativePath == ".lock")
- {
- continue;
- }
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- Entry& CurrentEntry = Entries[TotalFiles];
- CurrentEntry.RelativePath = RelativePath;
- CurrentEntry.Size = DirContent.FileSizes[FileIndex];
- CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
-
- bool FoundHash = false;
- if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- const Entry& StateEntry = StateEntries[KnownIt->second];
- if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+ if (AbsPath.filename() == "reserve.gc")
{
- CurrentEntry.Hash = StateEntry.Hash;
- FoundHash = true;
+ continue;
+ }
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ if (*RelativePath.begin() == ".sentry-native")
+ {
+ continue;
+ }
+ if (RelativePath == ".lock")
+ {
+ continue;
}
- }
-
- if (!FoundHash)
- {
- Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- Entry& CurrentEntry = Entries[EntryIndex];
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = RelativePath;
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
- bool FoundHash = false;
- if (AbsPath.extension().empty())
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
{
- auto It = CurrentEntry.RelativePath.begin();
- if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
- RawHash,
- RawSize);
- if (Compressed)
- {
- // We compose a meta-hash since taking the RawHash might collide with an existing
- // non-compressed file with the same content The collision is unlikely except if the
- // compressed data is zero bytes causing RawHash to be the same as an empty file.
- IoHashStream Hasher;
- Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
- Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
- CurrentEntry.Hash = Hasher.GetHash();
- FoundHash = true;
- }
- }
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
}
+ }
- if (!FoundHash)
- {
- CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
- }
- });
- HashedFiles++;
- HashedBytes += CurrentEntry.Size;
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(*m_Threading.WorkerPool,
+ [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ Entry& CurrentEntry = Entries[EntryIndex];
+
+ bool FoundHash = false;
+ if (AbsPath.extension().empty())
+ {
+ auto It = CurrentEntry.RelativePath.begin();
+ if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
+ RawHash,
+ RawSize);
+ if (Compressed)
+ {
+ // We compose a meta-hash since taking the RawHash might collide with an
+ // existing non-compressed file with the same content The collision is
+ // unlikely except if the compressed data is zero bytes causing RawHash
+ // to be the same as an empty file.
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
+ CurrentEntry.Hash = Hasher.GetHash();
+ FoundHash = true;
+ }
+ }
+ }
+
+ if (!FoundHash)
+ {
+ CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
+ }
+ });
+ HashedFiles++;
+ HashedBytes += CurrentEntry.Size;
+ }
+ TotalFiles++;
+ TotalBytes += CurrentEntry.Size;
}
- TotalFiles++;
- TotalBytes += CurrentEntry.Size;
- }
- std::vector<IoHash> ExistingEntries = m_Storage->List();
- ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
+ std::vector<IoHash> ExistingEntries = m_Storage->List();
+ ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
- Work.Wait();
-
- Entries.resize(TotalFiles);
- }
+ Work.Wait();
- uint64_t UploadedFiles = 0;
- uint64_t UploadedBytes = 0;
- uint64_t TouchedFiles = 0;
- uint64_t TouchedBytes = 0;
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ Entries.resize(TotalFiles);
+ }
- for (const Entry& CurrentEntry : Entries)
+ uint64_t UploadedFiles = 0;
+ uint64_t UploadedBytes = 0;
+ uint64_t TouchedFiles = 0;
+ uint64_t TouchedBytes = 0;
{
- if (!ExistsLookup.contains(CurrentEntry.Hash))
- {
- m_Storage->Put(Work,
- *m_Threading.WorkerPool,
- CurrentEntry.Hash,
- CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
- UploadedFiles++;
- UploadedBytes += CurrentEntry.Size;
- }
- else
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+
+ for (const Entry& CurrentEntry : Entries)
{
- // Refresh the backend's modification time so lifecycle-expiration policies
- // do not evict CAS entries that are still referenced by this module.
- m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
- TouchedFiles++;
- TouchedBytes += CurrentEntry.Size;
+ if (!ExistsLookup.contains(CurrentEntry.Hash))
+ {
+ m_Storage->Put(Work,
+ *m_Threading.WorkerPool,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
+ UploadedFiles++;
+ UploadedBytes += CurrentEntry.Size;
+ }
+ else
+ {
+ // Refresh the backend's modification time so lifecycle-expiration policies
+ // do not evict CAS entries that are still referenced by this module.
+ m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
+ TouchedFiles++;
+ TouchedBytes += CurrentEntry.Size;
+ }
}
- }
- Work.Wait();
- uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
-
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "SourceFolder" << ServerStateDir.generic_string();
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadTimeMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "StorageSettings" << m_Storage->GetSettings();
-
- Meta.BeginArray("Files");
- for (const Entry& CurrentEntry : Entries)
- {
- Meta.BeginObject();
+ Work.Wait();
+ uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SourceFolder" << ServerStateDir.generic_string();
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadTimeMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "StorageSettings" << m_Storage->GetSettings();
+
+ Meta.BeginArray("Files");
+ for (const Entry& CurrentEntry : Entries)
{
- Meta << "Path" << CurrentEntry.RelativePath.generic_string();
- Meta << "Size" << CurrentEntry.Size;
- Meta << "ModTick" << CurrentEntry.ModTick;
- Meta << "Hash" << CurrentEntry.Hash;
+ Meta.BeginObject();
+ {
+ Meta << "Path" << CurrentEntry.RelativePath.generic_string();
+ Meta << "Size" << CurrentEntry.Size;
+ Meta << "ModTick" << CurrentEntry.ModTick;
+ Meta << "Hash" << CurrentEntry.Hash;
+ }
+ Meta.EndObject();
}
- Meta.EndObject();
- }
- Meta.EndArray();
-
- m_Storage->SaveMetadata(Meta.Save());
- }
+ Meta.EndArray();
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
-
- ZEN_INFO(
- "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- HashedFiles,
- NiceBytes(HashedBytes),
- UploadedFiles,
- NiceBytes(UploadedBytes),
- TouchedFiles,
- NiceBytes(TouchedBytes),
- TotalFiles,
- NiceBytes(TotalBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
- }
-}
+ m_Storage->SaveMetadata(Meta.Save());
+ }
-CbObject
-IncrementalHydrator::Hydrate()
-{
- Stopwatch Timer;
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- try
- {
- CbObject Meta = m_Storage->LoadMetadata();
- if (!Meta)
+ ZEN_INFO(
+ "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) "
+ "in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ HashedFiles,
+ NiceBytes(HashedBytes),
+ UploadedFiles,
+ NiceBytes(UploadedBytes),
+ TouchedFiles,
+ NiceBytes(TouchedBytes),
+ TotalFiles,
+ NiceBytes(TotalBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ catch (const std::exception& Ex)
{
- ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- return CbObject();
+ ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
}
+ }
- std::unordered_map<std::string, size_t> EntryLookup;
- std::vector<Entry> Entries;
- uint64_t TotalSize = 0;
+ CbObject IncrementalHydrator::Hydrate()
+ {
+ Stopwatch Timer;
- for (CbFieldView FieldView : Meta["Files"])
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ try
{
- CbObjectView EntryView = FieldView.AsObjectView();
- if (EntryView)
+ CbObject Meta = m_Storage->LoadMetadata();
+ if (!Meta)
{
- Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
- .Size = EntryView["Size"].AsUInt64(),
- .ModTick = EntryView["ModTick"].AsUInt64(),
- .Hash = EntryView["Hash"].AsHash()};
- TotalSize += NewEntry.Size;
- EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
- Entries.emplace_back(std::move(NewEntry));
+ ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ return CbObject();
}
- }
- ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- Entries.size(),
- NiceBytes(TotalSize));
+ std::unordered_map<std::string, size_t> EntryLookup;
+ std::vector<Entry> Entries;
+ uint64_t TotalSize = 0;
+
+ for (CbFieldView FieldView : Meta["Files"])
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
+ {
+ Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
+ .Size = EntryView["Size"].AsUInt64(),
+ .ModTick = EntryView["ModTick"].AsUInt64(),
+ .Hash = EntryView["Hash"].AsHash()};
+ TotalSize += NewEntry.Size;
+ EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
+ Entries.emplace_back(std::move(NewEntry));
+ }
+ }
- m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ Entries.size(),
+ NiceBytes(TotalSize));
- uint64_t DownloadedBytes = 0;
- uint64_t DownloadedFiles = 0;
+ m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ uint64_t DownloadedBytes = 0;
+ uint64_t DownloadedFiles = 0;
- for (const Entry& CurrentEntry : Entries)
{
- std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
- CreateDirectories(Path.parent_path());
- m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
- DownloadedBytes += CurrentEntry.Size;
- DownloadedFiles++;
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (const Entry& CurrentEntry : Entries)
+ {
+ std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
+ CreateDirectories(Path.parent_path());
+ m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
+ DownloadedBytes += CurrentEntry.Size;
+ DownloadedFiles++;
+ }
+
+ Work.Wait();
}
- Work.Wait();
- }
+ // Downloaded successfully - swap into ServerStateDir
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- // Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItTmp != TempDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ TempDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
- if (ItTmp != TempDir.begin())
- {
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ }
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ }
+ }
+
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+
+ // TODO: This could perhaps be done more efficently, but ok for now
DirectoryContent DirContent;
GetDirectoryContent(*m_Threading.WorkerPool,
- TempDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
DirContent);
- for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files");
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
- if (Ec)
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+
+ if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
{
- throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path" << RelativePath.generic_string();
+ HydrateState << "Size" << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash" << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
}
- }
- for (const std::filesystem::path& AbsPath : DirContent.Files)
- {
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
- if (Ec)
+ else
{
- throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ ZEN_ASSERT(false);
}
}
+ HydrateState.EndArray();
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ CbObject StateObject = HydrateState.Save();
+
+ ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DownloadedFiles,
+ NiceBytes(DownloadedBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return StateObject;
}
- else
+ catch (const std::exception& Ex)
{
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ return {};
}
+ }
- // TODO: This could perhaps be done more efficently, but ok for now
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
-
- CbObjectWriter HydrateState;
- HydrateState.BeginArray("Files");
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ void IncrementalHydrator::Obliterate()
+ {
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
- {
- HydrateState.BeginObject();
- {
- HydrateState << "Path" << RelativePath.generic_string();
- HydrateState << "Size" << DirContent.FileSizes[FileIndex];
- HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
- HydrateState << "Hash" << Entries[It->second].Hash;
- }
- HydrateState.EndObject();
- }
- else
- {
- ZEN_ASSERT(false);
- }
+ try
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ m_Storage->Delete(Work, *m_Threading.WorkerPool);
+ Work.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
}
- HydrateState.EndArray();
-
- CbObject StateObject = HydrateState.Save();
-
- ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- DownloadedFiles,
- NiceBytes(DownloadedBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- return StateObject;
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
- return {};
}
-}
-void
-IncrementalHydrator::Obliterate()
+} // namespace hydration_impl
+
+///////////////////////////////////////////////////////////////////////////
+// HydrationBase subclasses - own hub-wide backend state, hand per-module
+// storages the exact inputs they need in CreateHydrator.
+
+class FileHydration : public HydrationBase
{
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+public:
+ explicit FileHydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::filesystem::path m_StorageRoot;
+};
- try
+class S3Hydration : public HydrationBase
+{
+public:
+ explicit S3Hydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::string m_Bucket;
+ std::string m_Region;
+ std::string m_Endpoint;
+ bool m_PathStyle = false;
+ std::string m_KeyPrefixRoot;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+ std::unique_ptr<S3Client> m_Client;
+ uint64_t m_DefaultMultipartChunkSize;
+};
+
+///////////////////////////////////////////////////////////////////////////
+// Implementations
+
+FileHydration::FileHydration(const Configuration& Config)
+{
+ if (!Config.TargetSpecification.empty())
{
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- m_Storage->Delete(Work, *m_Threading.WorkerPool);
- Work.Wait();
+ m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length()));
+ if (m_StorageRoot.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ }
}
- catch (const std::exception& Ex)
+ else
{
- ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ m_StorageRoot = Utf8ToWide(std::string(Path));
}
-
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ MakeSafeAbsolutePathInPlace(m_StorageRoot);
}
std::unique_ptr<HydrationStrategyBase>
-CreateHydrator(const HydrationConfig& Config)
+FileHydration::CreateHydrator(const HydrationConfig& Config)
{
- std::unique_ptr<StorageBase> Storage;
+ using namespace hydration_impl;
+ return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId));
+}
+S3Hydration::S3Hydration(const Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Spec;
if (!Config.TargetSpecification.empty())
{
- if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
- {
- Storage = std::make_unique<FileStorage>();
- }
- else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
- {
- Storage = std::make_unique<S3Storage>();
- }
- else
- {
- throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
- }
+ Spec = Config.TargetSpecification;
+ Spec.remove_prefix(S3Storage::Prefix.size());
}
else
{
- std::string_view Type = Config.Options["type"].AsString();
- if (Type == FileHydratorType)
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
{
- Storage = std::make_unique<FileStorage>();
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
}
- else if (Type == S3HydratorType)
- {
- Storage = std::make_unique<S3Storage>();
- }
- else if (!Type.empty())
+ Spec = Uri;
+ Spec.remove_prefix(S3Storage::Prefix.size());
+ }
+
+ size_t SlashPos = Spec.find('/');
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
+
+ if (m_Bucket.empty())
+ {
+ throw zen::runtime_error("Incremental S3 hydration config requires a bucket name");
+ }
+
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
+
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
+ {
+ m_Endpoint = std::string(Endpoint);
+ m_PathStyle = Settings["path-style"].AsBool();
+ }
+
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+
+ m_DefaultMultipartChunkSize = Settings["chunk-size"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+
+ S3ClientOptions ClientOptions;
+ ClientOptions.BucketName = m_Bucket;
+ ClientOptions.Region = m_Region;
+ ClientOptions.Endpoint = m_Endpoint;
+ ClientOptions.PathStyle = m_PathStyle;
+ if (m_CredentialProvider)
+ {
+ ClientOptions.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ ClientOptions.Credentials = m_Credentials;
+ }
+ ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
+ m_Client = std::make_unique<S3Client>(ClientOptions);
+}
+
+std::unique_ptr<HydrationStrategyBase>
+S3Hydration::CreateHydrator(const HydrationConfig& Config)
+{
+ using namespace hydration_impl;
+ std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId);
+ return std::make_unique<IncrementalHydrator>(
+ Config,
+ std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize));
+}
+
+std::unique_ptr<HydrationBase>
+InitHydration(const HydrationBase::Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ if (!Config.TargetSpecification.empty())
+ {
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0)
{
- throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ return std::make_unique<FileHydration>(Config);
}
- else
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0)
{
- throw zen::runtime_error("No hydration target configured");
+ return std::make_unique<S3Hydration>(Config);
}
+ throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification);
}
- auto Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
- Hydrator->Configure(Config);
- return Hydrator;
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileStorage::Type)
+ {
+ return std::make_unique<FileHydration>(Config);
+ }
+ if (Type == S3Storage::Type)
+ {
+ return std::make_unique<S3Hydration>(Config);
+ }
+ if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ throw zen::runtime_error("No hydration target configured");
}
#if ZEN_WITH_TESTS
@@ -1226,27 +1297,19 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
const std::string ModuleId = "testmodule";
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate: copy server state to file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Verify the module folder exists in the store and ServerStateDir was wiped
CHECK(std::filesystem::exists(HydrationStore / ModuleId));
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: restore server state from file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// Verify restored contents match the original
VerifyTree(ServerStateDir, TestFiles);
@@ -1265,26 +1328,17 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- // Dehydrate the original state
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Put a stale file in ServerStateDir to simulate leftover state
WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
// Hydrate - must wipe stale file and restore original
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
VerifyTree(ServerStateDir, TestFiles);
@@ -1309,23 +1363,15 @@ TEST_CASE("hydration.file.excluded_files_not_dehydrated")
WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule_excl";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Hydrate into a clean directory
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// Normal files must be restored
VerifyTree(ServerStateDir, TestFiles);
@@ -1352,17 +1398,12 @@ TEST_CASE("hydration.file.obliterate")
const std::string ModuleId = "obliterate_test";
CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate so the backend store has data
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CHECK(std::filesystem::exists(HydrationStore / ModuleId));
// Put some files back in ServerStateDir and TempDir to verify cleanup
@@ -1370,10 +1411,7 @@ TEST_CASE("hydration.file.obliterate")
WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
// Obliterate
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Obliterate();
- }
+ Hydration->CreateHydrator(Config)->Obliterate();
// Backend store directory deleted
CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
@@ -1406,6 +1444,8 @@ TEST_CASE("hydration.file.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("file_concurrent_{}", I);
@@ -1414,12 +1454,11 @@ TEST_CASE("hydration.file.concurrent")
CreateDirectories(StateDir);
CreateDirectories(TempPath);
- Modules[I].Config.ServerStateDir = StateDir;
- Modules[I].Config.TempDir = TempPath;
- Modules[I].Config.ModuleId = ModuleId;
- Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string();
- Modules[I].Config.Threading = Threading.Options;
- Modules[I].Files = CreateSmallTestTree(StateDir);
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateSmallTestTree(StateDir);
}
// Concurrent dehydrate
@@ -1431,9 +1470,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1449,9 +1487,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -1490,10 +1527,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_roundtrip";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1501,39 +1535,30 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"};
// Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir
// with a stale file to confirm the cleanup branch wipes it.
WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
CHECK(std::filesystem::is_empty(ServerStateDir));
// v1: dehydrate without a marker file
CreateSmallTestTree(ServerStateDir);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// v2: dehydrate WITH a marker file that only v2 has
CreateSmallTestTree(ServerStateDir);
WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Hydrate must restore v2 (the latest dehydrated state)
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// v2 marker must be present - confirms the second dehydration overwrote the first
CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
@@ -1568,6 +1593,18 @@ TEST_CASE("hydration.s3.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("s3_concurrent_{}", I);
@@ -1580,16 +1617,7 @@ TEST_CASE("hydration.s3.concurrent")
Modules[I].Config.TempDir = TempPath;
Modules[I].Config.ModuleId = ModuleId;
Modules[I].Config.Threading = Threading.Options;
- {
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Modules[I].Config.Options = std::move(Root).AsObject();
- }
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Files = CreateTestTree(StateDir);
}
// Concurrent dehydrate
@@ -1601,9 +1629,8 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1619,10 +1646,9 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
CleanDirectory(Config.ServerStateDir, true);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -1656,10 +1682,7 @@ TEST_CASE("hydration.s3.obliterate")
const std::string ModuleId = "s3test_obliterate";
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1667,15 +1690,15 @@ TEST_CASE("hydration.s3.obliterate")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate to populate backend
CreateSmallTestTree(ServerStateDir);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
auto ListModuleObjects = [&]() {
S3ClientOptions Opts;
@@ -1696,10 +1719,7 @@ TEST_CASE("hydration.s3.obliterate")
WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
// Obliterate
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Obliterate();
- }
+ Hydration->CreateHydrator(Config)->Obliterate();
// Verify S3 objects deleted
CHECK(ListModuleObjects().Objects.empty());
@@ -1731,10 +1751,7 @@ TEST_CASE("hydration.s3.config_overrides")
{
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_prefix";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson = fmt::format(
R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
@@ -1742,20 +1759,17 @@ TEST_CASE("hydration.s3.config_overrides")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
VerifyTree(ServerStateDir, TestFiles);
}
@@ -1768,10 +1782,7 @@ TEST_CASE("hydration.s3.config_overrides")
ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_region_override";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson = fmt::format(
R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
@@ -1779,20 +1790,17 @@ TEST_CASE("hydration.s3.config_overrides")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
VerifyTree(ServerStateDir, TestFiles);
}
@@ -1822,25 +1830,26 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
TestThreading Threading(4);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.Threading = Threading.Options;
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
-
- // Dehydrate: upload server state to MinIO
+ HydrationBase::Configuration BaseConfig;
{
- ZEN_INFO("============== DEHYDRATE ==============");
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
+
+ // Dehydrate: upload server state to MinIO
+ ZEN_INFO("============== DEHYDRATE ==============");
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
for (size_t I = 0; I < 1; I++)
{
@@ -1849,11 +1858,8 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: download from MinIO back to server state
- {
- ZEN_INFO("=============== HYDRATE ===============");
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ ZEN_INFO("=============== HYDRATE ===============");
+ Hydration->CreateHydrator(Config)->Hydrate();
}
}
@@ -1880,23 +1886,16 @@ TEST_CASE("hydration.file.incremental")
TestThreading Threading(4);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
- Config.Threading = Threading.Options;
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- std::unique_ptr<StorageBase> Storage = std::make_unique<FileStorage>();
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
// Hydrate with no prior state
- CbObject HydrationState;
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- CHECK_FALSE(HydrationState);
- }
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
# ifdef REAL_DATA_PATH
ZEN_INFO("Writing state data...");
@@ -1906,54 +1905,33 @@ TEST_CASE("hydration.file.incremental")
// Create test files and dehydrate
auto TestFiles = CreateTestTree(ServerStateDir);
# endif
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
- // Hydrate: restore from S3
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ // Hydrate: restore from file store
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif
// Dehydrate again with cached state (should skip re-uploading unchanged files)
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
// Replace files and dehydrate
TestFiles = CreateTestTree(ServerStateDir);
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif // 0
// Dehydrate, nothing touched - no hashing, no upload
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
}
// ---------------------------------------------------------------------------
@@ -1986,11 +1964,7 @@ TEST_CASE("hydration.s3.incremental")
TestThreading Threading(8);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.Threading = Threading.Options;
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1998,19 +1972,18 @@ TEST_CASE("hydration.s3.incremental")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- std::unique_ptr<StorageBase> Storage = std::make_unique<S3Storage>();
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
// Hydrate with no prior state
- CbObject HydrationState;
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- CHECK_FALSE(HydrationState);
- }
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
# ifdef REAL_DATA_PATH
ZEN_INFO("Writing state data...");
@@ -2020,83 +1993,51 @@ TEST_CASE("hydration.s3.incremental")
// Create test files and dehydrate
auto TestFiles = CreateTestTree(ServerStateDir);
# endif
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: restore from S3
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif
// Dehydrate again with cached state (should skip re-uploading unchanged files)
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
// Replace files and dehydrate
TestFiles = CreateTestTree(ServerStateDir);
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif // 0
// Dehydrate, nothing touched - no hashing, no upload
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
}
TEST_CASE("hydration.create_hydrator_rejects_invalid_config")
{
- ScopedTemporaryDirectory TempDir;
-
- HydrationConfig Config;
- Config.ServerStateDir = TempDir.Path() / "state";
- Config.TempDir = TempDir.Path() / "temp";
- Config.ModuleId = "invalid_test";
-
// Unknown TargetSpecification prefix
- Config.TargetSpecification = "ftp://somewhere";
- CHECK_THROWS(CreateHydrator(Config));
+ CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"}));
// Unknown Options type
- Config.TargetSpecification.clear();
{
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()}));
}
- CHECK_THROWS(CreateHydrator(Config));
// Empty Options (no type field)
- Config.Options = CbObject();
- CHECK_THROWS(CreateHydrator(Config));
+ CHECK_THROWS(InitHydration({}));
}
TEST_SUITE_END();
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index fc2f309b2..0455dda91 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -4,8 +4,11 @@
#include <zencore/compactbinary.h>
+#include <atomic>
#include <filesystem>
+#include <memory>
#include <optional>
+#include <string>
namespace zen {
@@ -19,16 +22,12 @@ struct HydrationConfig
std::filesystem::path TempDir;
// Module ID of the server state being hydrated/dehydrated
std::string ModuleId;
- // Back-end specific target specification (e.g. S3 bucket, file path, etc)
- std::string TargetSpecification;
- // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification)
- CbObject Options;
struct ThreadingOptions
{
- WorkerThreadPool* WorkerPool;
- std::atomic<bool>* AbortFlag;
- std::atomic<bool>* PauseFlag;
+ WorkerThreadPool* WorkerPool = nullptr;
+ std::atomic<bool>* AbortFlag = nullptr;
+ std::atomic<bool>* PauseFlag = nullptr;
};
// External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
@@ -41,15 +40,11 @@ struct HydrationConfig
* An instance of this interface is used to perform hydration OR
* dehydration of server state. It's expected to be used only once
* and not reused.
- *
*/
struct HydrationStrategyBase
{
virtual ~HydrationStrategyBase() = default;
- // Set up the hydration target from Config. Must be called before Hydrate/Dehydrate.
- virtual void Configure(const HydrationConfig& Config) = 0;
-
// Upload server state to the configured target. ServerStateDir is wiped on success.
// On failure, ServerStateDir is left intact.
virtual void Dehydrate(const CbObject& CachedState) = 0;
@@ -62,8 +57,36 @@ struct HydrationStrategyBase
virtual void Obliterate() = 0;
};
-// Create a configured hydrator based on Config. Ready to call Hydrate/Dehydrate immediately.
-std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config);
+/**
+ * @brief Hub-wide hydration backend
+ *
+ * Constructed once per hub via InitHydration. Holds the shared connection / client /
+ * credentials state for the configured backend (e.g. a single S3 client and IMDS
+ * credential provider shared by all modules). CreateHydrator produces a ready-to-use
+ * per-module HydrationStrategyBase that references the shared state - no per-module
+ * backend setup cost.
+ */
+class HydrationBase
+{
+public:
+ struct Configuration
+ {
+ // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
+ std::string TargetSpecification;
+ // Full config object (mutually exclusive with TargetSpecification)
+ CbObject Options;
+ };
+
+ virtual ~HydrationBase() = default;
+
+ // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
+};
+
+// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
+// Throws zen::runtime_error if the config cannot be resolved to a known backend or if
+// backend-specific validation fails.
+std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config);
#if ZEN_WITH_TESTS
void hydration_forcelink();
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index af2c19113..97edc5223 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -11,8 +11,12 @@
namespace zen {
-StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId)
-: m_Config(Config)
+StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId)
+: m_Hydration(Hydration)
+, m_Config(Config)
, m_ModuleId(ModuleId)
, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
{
@@ -156,7 +160,7 @@ StorageServerInstance::ObliterateLocked()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
Hydrator->Obliterate();
}
@@ -204,7 +208,7 @@ StorageServerInstance::Hydrate()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
m_HydrationState = Hydrator->Hydrate();
}
@@ -214,18 +218,14 @@ StorageServerInstance::Dehydrate()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
Hydrator->Dehydrate(m_HydrationState);
}
HydrationConfig
StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
{
- HydrationConfig Config{.ServerStateDir = m_Config.StateDir,
- .TempDir = m_Config.TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification,
- .Options = m_Config.HydrationOptions};
+ HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId};
if (m_Config.OptionalWorkerPool)
{
Config.Threading.emplace(
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 80f8a5016..c5917afc9 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -29,8 +29,6 @@ public:
uint16_t BasePort;
std::filesystem::path StateDir;
std::filesystem::path TempDir;
- std::string HydrationTargetSpecification;
- CbObject HydrationOptions;
uint32_t HttpThreadCount = 0; // Automatic
int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
@@ -42,7 +40,10 @@ public:
WorkerThreadPool* OptionalWorkerPool = nullptr;
};
- StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId);
+ StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId);
~StorageServerInstance();
inline std::string_view GetModuleId() const { return m_ModuleId; }
@@ -140,6 +141,7 @@ private:
void WakeLocked();
mutable RwLock m_Lock;
+ HydrationBase& m_Hydration;
const Configuration m_Config;
std::string m_ModuleId;
ZenServerInstance m_ServerInstance;
diff --git a/src/zenutil/cloud/imdscredentials.cpp b/src/zenutil/cloud/imdscredentials.cpp
index 433afdc3c..a23cb9c28 100644
--- a/src/zenutil/cloud/imdscredentials.cpp
+++ b/src/zenutil/cloud/imdscredentials.cpp
@@ -64,6 +64,7 @@ ImdsCredentialProvider::ImdsCredentialProvider(const ImdsCredentialProviderOptio
.LogCategory = "imds",
.ConnectTimeout = Options.ConnectTimeout,
.Timeout = Options.RequestTimeout,
+ .RetryCount = 3,
})
{
ZEN_INFO("IMDS credential provider configured (endpoint: {})", m_HttpClient.GetBaseUri());
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index 3d6dca562..f8bed92da 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -135,6 +135,41 @@ namespace {
return nullptr;
}
+ /// Extract Code/Message from an S3 XML error body. Returns true if an <Error> element was
+ /// found, even if Code/Message are empty.
+ bool ExtractS3Error(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage)
+ {
+ if (Body.find("<Error>") == std::string_view::npos)
+ {
+ return false;
+ }
+ OutCode = ExtractXmlValue(Body, "Code");
+ OutMessage = ExtractXmlValue(Body, "Message");
+ return true;
+ }
+
+ /// Build a human-readable error message for a failed S3 response. When the response body
+ /// contains an S3 `<Error>` element, the Code and Message fields are included in the string
+ /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.)
+ /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport
+ /// message when no XML body is available (HEAD responses, transport errors).
+ std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
+ {
+ if (!Response.Error.has_value() && Response.ResponsePayload)
+ {
+ std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize());
+ std::string_view Code;
+ std::string_view Message;
+ if (ExtractS3Error(Body, Code, Message) && (!Code.empty() || !Message.empty()))
+ {
+ ExtendableStringBuilder<256> Decoded;
+ DecodeXmlEntities(Message, Decoded);
+ return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
+ }
+ }
+ return Response.ErrorMessage(Prefix);
+ }
+
} // namespace
std::string_view S3GetObjectResult::NotFoundErrorText = "Not found";
@@ -187,6 +222,14 @@ S3Client::GetCurrentCredentials()
}
std::string
+S3Client::BuildNoCredentialsError(std::string Context)
+{
+ std::string Err = fmt::format("{}: no credentials available", Context);
+ ZEN_WARN("{}", Err);
+ return Err;
+}
+
+std::string
S3Client::BuildEndpoint() const
{
if (!m_Endpoint.empty())
@@ -286,14 +329,13 @@ S3Client::GetSigningKey(std::string_view DateStamp)
}
HttpClient::KeyValueMap
-S3Client::SignRequest(std::string_view Method,
+S3Client::SignRequest(const SigV4Credentials& Credentials,
+ std::string_view Method,
std::string_view Path,
std::string_view CanonicalQueryString,
std::string_view PayloadHash,
std::span<const std::pair<std::string, std::string>> ExtraSignedHeaders)
{
- SigV4Credentials Credentials = GetCurrentCredentials();
-
std::string AmzDate = GetAmzTimestamp();
// Build sorted headers to sign (must be sorted by lowercase name)
@@ -337,17 +379,23 @@ S3Client::SignRequest(std::string_view Method,
S3Result
S3Client::PutObject(std::string_view Key, IoBuffer Content)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 PUT '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
// Hash the payload
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, "", PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", PayloadHash);
HttpClient::Response Response = m_HttpClient.Put(Path, Content, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 PUT failed");
+ std::string Err = S3ErrorMessage("S3 PUT failed", Response);
ZEN_WARN("S3 PUT '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
@@ -362,9 +410,15 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content)
S3GetObjectResult
S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GET '{}' failed", Key); !Err.empty())
+ {
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers);
if (!Response.IsSuccess())
@@ -374,7 +428,7 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP
return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
}
- std::string Err = Response.ErrorMessage("S3 GET failed");
+ std::string Err = S3ErrorMessage("S3 GET failed", Response);
ZEN_WARN("S3 GET '{}' failed: {}", Key, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
@@ -390,9 +444,17 @@ S3GetObjectResult
S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize)
{
ZEN_ASSERT(RangeSize > 0);
+
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GET range '{}' [{}-{}] failed", Key, RangeStart, RangeStart + RangeSize - 1);
+ !Err.empty())
+ {
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
@@ -403,7 +465,7 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran
return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
}
- std::string Err = Response.ErrorMessage("S3 GET range failed");
+ std::string Err = S3ErrorMessage("S3 GET range failed", Response);
ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
@@ -437,14 +499,20 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran
S3Result
S3Client::DeleteObject(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 DELETE '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, "", EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Delete(Path, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 DELETE failed");
+ std::string Err = S3ErrorMessage("S3 DELETE failed", Response);
ZEN_WARN("S3 DELETE '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
@@ -459,6 +527,12 @@ S3Client::DeleteObject(std::string_view Key)
S3Result
S3Client::Touch(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 Touch '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
// x-amz-copy-source is always "/bucket/key" regardless of addressing style.
@@ -469,23 +543,23 @@ S3Client::Touch(std::string_view Key)
{"x-amz-metadata-directive", "REPLACE"},
}};
- HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, "", EmptyPayloadHash, ExtraSigned);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", EmptyPayloadHash, ExtraSigned);
HttpClient::Response Response = m_HttpClient.Put(Path, IoBuffer{}, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 Touch failed");
+ std::string Err = S3ErrorMessage("S3 Touch failed", Response);
ZEN_WARN("S3 Touch '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
// Copy operations can return HTTP 200 with an error in the XML body.
std::string_view ResponseBody = Response.AsText();
- if (ResponseBody.find("<Error>") != std::string_view::npos)
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
{
- std::string_view ErrorCode = ExtractXmlValue(ResponseBody, "Code");
- std::string_view ErrorMessage = ExtractXmlValue(ResponseBody, "Message");
- std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
+ std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
ZEN_WARN("{}", Err);
return S3Result{std::move(Err)};
}
@@ -500,9 +574,15 @@ S3Client::Touch(std::string_view Key)
S3HeadObjectResult
S3Client::HeadObject(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 HEAD '{}' failed", Key); !Err.empty())
+ {
+ return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("HEAD", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "HEAD", Path, "", EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Head(Path, Headers);
if (!Response.IsSuccess())
@@ -512,7 +592,7 @@ S3Client::HeadObject(std::string_view Key)
return S3HeadObjectResult{{}, {}, HeadObjectResult::NotFound};
}
- std::string Err = Response.ErrorMessage("S3 HEAD failed");
+ std::string Err = S3ErrorMessage("S3 HEAD failed", Response);
ZEN_WARN("S3 HEAD '{}' failed: {}", Key, Err);
return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error};
}
@@ -551,6 +631,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
for (;;)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 ListObjectsV2 prefix='{}' failed", Prefix); !Err.empty())
+ {
+ Result.Error = std::move(Err);
+ return Result;
+ }
+
// Build query parameters for ListObjectsV2
std::vector<std::pair<std::string, std::string>> QueryParams;
QueryParams.emplace_back("list-type", "2");
@@ -569,13 +656,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
std::string CanonicalQS = BuildCanonicalQueryString(std::move(QueryParams));
std::string RootPath = BucketRootPath();
- HttpClient::KeyValueMap Headers = SignRequest("GET", RootPath, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", RootPath, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(RootPath, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Get(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 ListObjectsV2 failed");
+ std::string Err = S3ErrorMessage("S3 ListObjectsV2 failed", Response);
ZEN_WARN("S3 ListObjectsV2 prefix='{}' failed: {}", Prefix, Err);
Result.Error = std::move(Err);
return Result;
@@ -654,16 +741,22 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
S3CreateMultipartUploadResult
S3Client::CreateMultipartUpload(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 CreateMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploads", ""}});
- HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Post(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 CreateMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Response);
ZEN_WARN("S3 CreateMultipartUpload '{}' failed: {}", Key, Err);
return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
}
@@ -693,6 +786,12 @@ S3Client::CreateMultipartUpload(std::string_view Key)
S3UploadPartResult
S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t PartNumber, IoBuffer Content)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 UploadPart '{}' part {} failed", Key, PartNumber); !Err.empty())
+ {
+ return S3UploadPartResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({
{"partNumber", fmt::format("{}", PartNumber)},
@@ -701,13 +800,13 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Put(FullPath, Content, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber));
+ std::string Err = S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber), Response);
ZEN_WARN("S3 UploadPart '{}' part {} failed: {}", Key, PartNumber, Err);
return S3UploadPartResult{S3Result{std::move(Err)}, {}};
}
@@ -733,6 +832,12 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view UploadId,
const std::vector<std::pair<uint32_t, std::string>>& PartETags)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 CompleteMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
@@ -748,7 +853,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view XmlView = XmlBody.ToView();
std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView));
- HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash);
Headers->emplace("Content-Type", "application/xml");
IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size());
@@ -757,18 +862,18 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
HttpClient::Response Response = m_HttpClient.Post(FullPath, Payload, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 CompleteMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Response);
ZEN_WARN("S3 CompleteMultipartUpload '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
// Check for error in response body - S3 can return 200 with an error in the XML body
std::string_view ResponseBody = Response.AsText();
- if (ResponseBody.find("<Error>") != std::string_view::npos)
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
{
- std::string_view ErrorCode = ExtractXmlValue(ResponseBody, "Code");
- std::string_view ErrorMessage = ExtractXmlValue(ResponseBody, "Message");
- std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
+ std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
ZEN_WARN("{}", Err);
return S3Result{std::move(Err)};
}
@@ -783,16 +888,22 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
S3Result
S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 AbortMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
- HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Delete(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 AbortMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 AbortMultipartUpload failed", Response);
ZEN_WARN("S3 AbortMultipartUpload '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
@@ -819,6 +930,12 @@ S3Client::GeneratePresignedPutUrl(std::string_view Key, std::chrono::seconds Exp
std::string
S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view Method, std::chrono::seconds ExpiresIn)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GeneratePresignedUrl '{}' {} failed", Key, Method); !Err.empty())
+ {
+ return {};
+ }
+
std::string Path = KeyToPath(Key);
std::string Scheme = "https";
@@ -827,7 +944,6 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
Scheme = "http";
}
- SigV4Credentials Credentials = GetCurrentCredentials();
return GeneratePresignedUrl(Credentials, Method, Scheme, m_Host, Path, m_Region, "s3", ExpiresIn);
}
diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h
index 4d72dd479..1ce2a768e 100644
--- a/src/zenutil/include/zenutil/cloud/s3client.h
+++ b/src/zenutil/include/zenutil/cloud/s3client.h
@@ -11,6 +11,10 @@
#include <zencore/thread.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#include <functional>
#include <span>
#include <string>
@@ -207,7 +211,8 @@ private:
/// Sign a request and return headers with Authorization, x-amz-date, x-amz-content-sha256.
/// Additional x-amz-* headers that must participate in the signature are passed via
/// ExtraSignedHeaders (lowercase name, value); they are also copied into the returned map.
- HttpClient::KeyValueMap SignRequest(std::string_view Method,
+ HttpClient::KeyValueMap SignRequest(const SigV4Credentials& Credentials,
+ std::string_view Method,
std::string_view Path,
std::string_view QueryString,
std::string_view PayloadHash,
@@ -219,6 +224,23 @@ private:
/// Get the current credentials, either from the provider or from static config
SigV4Credentials GetCurrentCredentials();
+ /// Populate OutCredentials and return empty string on success; on failure return a
+ /// "<context>: no credentials available" error (also logged). Context args are only
+ /// formatted on the failure path.
+ template<typename... ContextArgs>
+ std::string RequireCredentials(SigV4Credentials& OutCredentials, fmt::format_string<ContextArgs...> ContextFmt, ContextArgs&&... Args)
+ {
+ OutCredentials = GetCurrentCredentials();
+ if (!OutCredentials.AccessKeyId.empty())
+ {
+ return {};
+ }
+ OutCredentials = {};
+ return BuildNoCredentialsError(fmt::format(ContextFmt, std::forward<ContextArgs>(Args)...));
+ }
+
+ std::string BuildNoCredentialsError(std::string Context);
+
LoggerRef m_Log;
std::string m_BucketName;
std::string m_Region;