aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-21 16:26:57 +0200
committerGitHub Enterprise <[email protected]>2026-04-21 16:26:57 +0200
commit6b59d3d37dcc6320929df2f0074f9a1cb506d1fd (patch)
tree2ddd317e381c29a97c666e9d72cf5d614a13f6f8 /src/zenserver
parentzen CLI security review fixes (#974) (diff)
downloadarchived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.tar.xz
archived-zen-6b59d3d37dcc6320929df2f0074f9a1cb506d1fd.zip
improved s3 hydration (#997)
- Improvement: Hub shares a single S3 client and IMDS credential provider across all modules, reducing IMDS load and surviving transient IMDS blips during bulk provisioning - Improvement: Hub validates hydration config at startup; bad `--hub-hydration-target-spec` or `--hub-hydration-target-config` now fails `zen hub` at boot instead of per-module at first hydrate - Improvement: S3 hydration multipart chunk size configurable via `settings.chunk-size` (default 32 MiB) - Improvement: S3 client extracts `<Error><Code>` and `<Message>` from XML error bodies (previously logged as `<unhandled content format>`) - Improvement: S3 client fails fast with a "no credentials available" error when AWS credentials are missing, instead of sending an unsigned request that S3 rejects with a generic 400 - Improvement: IMDS credential provider retries transient connection failures (up to 3 attempts with backoff) - Improvement: HTTP clients with `RetryCount > 0` also retry on `CURLE_COULDNT_CONNECT`
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/hub/hub.cpp41
-rw-r--r--src/zenserver/hub/hub.h5
-rw-r--r--src/zenserver/hub/hydration.cpp2137
-rw-r--r--src/zenserver/hub/hydration.h49
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp20
-rw-r--r--src/zenserver/hub/storageserverinstance.h8
6 files changed, 1111 insertions, 1149 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 128d3ed35..3fbb7a26e 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -183,20 +183,22 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy
Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
"Provision and hydration worker pools must be distinct to avoid deadlocks");
+ HydrationBase::Configuration HydrationConfig;
if (!m_Config.HydrationTargetSpecification.empty())
{
- m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification;
}
else if (!m_Config.HydrationOptions)
{
std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
ZEN_INFO("using file hydration path: '{}'", FileHydrationPath);
- m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
+ HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
}
else
{
- m_HydrationOptions = m_Config.HydrationOptions;
+ HydrationConfig.Options = m_Config.HydrationOptions;
}
+ m_Hydration = InitHydration(HydrationConfig);
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
@@ -324,19 +326,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
- .TempDir = m_HydrationTempPath / ModuleId,
- .HydrationTargetSpecification = m_HydrationTargetSpecification,
- .HydrationOptions = m_HydrationOptions,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath,
- .Malloc = m_Config.InstanceMalloc,
- .Trace = m_Config.InstanceTrace,
- .TraceHost = m_Config.InstanceTraceHost,
- .TraceFile = m_Config.InstanceTraceFile,
- .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ *m_Hydration,
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -1216,11 +1217,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId)
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
- HydrationConfig Config{.ServerStateDir = ServerStateDir,
- .TempDir = TempDir,
- .ModuleId = std::string(ModuleId),
- .TargetSpecification = m_HydrationTargetSpecification,
- .Options = m_HydrationOptions};
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)};
if (m_Config.OptionalHydrationWorkerPool)
{
Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool,
@@ -1228,7 +1225,7 @@ Hub::ObliterateBackendData(std::string_view ModuleId)
.PauseFlag = &PauseFlag});
}
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
Hydrator->Obliterate();
}
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 040f34af5..40d046ce0 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -199,9 +199,8 @@ private:
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
- std::string m_HydrationTargetSpecification;
- CbObject m_HydrationOptions;
- std::filesystem::path m_HydrationTempPath;
+ std::unique_ptr<HydrationBase> m_Hydration;
+ std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 266296fa9..4b2294b49 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -79,15 +79,14 @@ namespace hydration_impl {
CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
}
+ ///////////////////////////////////////////////////////////////////////
+ // Per-module storage interface driven by IncrementalHydrator.
+
class StorageBase
{
public:
- virtual ~StorageBase() {}
+ virtual ~StorageBase() = default;
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options) = 0;
virtual void SaveMetadata(const CbObject& Data) = 0;
virtual CbObject LoadMetadata() = 0;
virtual CbObject GetSettings() = 0;
@@ -107,141 +106,31 @@ namespace hydration_impl {
virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
};
- constexpr std::string_view FileHydratorPrefix = "file://";
- constexpr std::string_view FileHydratorType = "file";
-
- constexpr std::string_view S3HydratorPrefix = "s3://";
- constexpr std::string_view S3HydratorType = "s3";
-
class FileStorage : public StorageBase
{
public:
- FileStorage() {}
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options)
- {
- ZEN_UNUSED(TempDir);
- if (!TargetSpecification.empty())
- {
- m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length()));
- if (m_StoragePath.empty())
- {
- throw zen::runtime_error("Hydration config 'file' type requires a directory path");
- }
- }
- else
- {
- CbObjectView Settings = Options["settings"].AsObjectView();
- std::string_view Path = Settings["path"].AsString();
- if (Path.empty())
- {
- throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
- }
- m_StoragePath = Utf8ToWide(std::string(Path));
- }
- m_StoragePath = m_StoragePath / ModuleId;
- MakeSafeAbsolutePathInPlace(m_StoragePath);
-
- m_StatePathName = m_StoragePath / "current-state.cbo";
- m_CASPath = m_StoragePath / "cas";
- CreateDirectories(m_CASPath);
- }
- virtual void SaveMetadata(const CbObject& Data)
- {
- BinaryWriter Output;
- SaveCompactBinary(Output, Data);
- WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
- }
- virtual CbObject LoadMetadata()
- {
- if (!IsFile(m_StatePathName))
- {
- return {};
- }
- FileContents Content = ReadFile(m_StatePathName);
- if (Content.ErrorCode)
- {
- ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
- }
- IoBuffer Payload = Content.Flatten();
- CbValidateError Error;
- CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
- if (Error != CbValidateError::None)
- {
- throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
- }
- return Result;
- }
-
- virtual CbObject GetSettings() override { return {}; }
- virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); }
-
- virtual std::vector<IoHash> List()
- {
- DirectoryContent DirContent;
- GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
- std::vector<IoHash> Result;
- Result.reserve(DirContent.Files.size());
- for (const std::filesystem::path& Path : DirContent.Files)
- {
- IoHash Hash;
- if (IoHash::TryParse(Path.filename().string(), Hash))
- {
- Result.push_back(Hash);
- }
- }
- return Result;
- }
-
- virtual void Put(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& SourcePath)
- {
- ZEN_UNUSED(Size);
- Work.ScheduleWork(WorkerPool,
- [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag.load())
- {
- CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true});
- }
- });
- }
+ static constexpr std::string_view Prefix = "file://";
+ static constexpr std::string_view Type = "file";
- virtual void Get(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& DestinationPath)
- {
- ZEN_UNUSED(Size);
- Work.ScheduleWork(
- WorkerPool,
- [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag.load())
- {
- CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true});
- }
- });
- }
+ explicit FileStorage(std::filesystem::path ModulePath);
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override
- {
- // Local filesystem backend does not use modification-time-based retention, so no refresh is needed.
- ZEN_UNUSED(Work);
- ZEN_UNUSED(WorkerPool);
- ZEN_UNUSED(Hash);
- }
-
- virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
- {
- ZEN_UNUSED(Work);
- ZEN_UNUSED(WorkerPool);
- DeleteDirectories(m_StoragePath);
- }
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override { return {}; }
+ virtual void ParseSettings(const CbObjectView&) override {}
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {}
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
private:
std::filesystem::path m_StoragePath;
@@ -252,876 +141,1058 @@ namespace hydration_impl {
class S3Storage : public StorageBase
{
public:
- S3Storage() {}
+ static constexpr std::string_view Prefix = "s3://";
+ static constexpr std::string_view Type = "s3";
+ static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
- virtual void Configure(std::string_view ModuleId,
- const std::filesystem::path& TempDir,
- std::string_view TargetSpecification,
- const CbObject& Options)
- {
- m_Options = Options;
+ S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
- CbObjectView Settings = m_Options["settings"].AsObjectView();
- std::string_view Spec;
- if (!TargetSpecification.empty())
- {
- Spec = TargetSpecification;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
- else
- {
- std::string_view Uri = Settings["uri"].AsString();
- if (Uri.empty())
- {
- throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
- }
- Spec = Uri;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override;
+ virtual void ParseSettings(const CbObjectView& Settings) override;
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
- size_t SlashPos = Spec.find('/');
- std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
- m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
- m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId);
+ private:
+ S3Client& m_Client;
+ std::string m_KeyPrefix;
+ std::filesystem::path m_TempDir;
+ uint64_t m_MultipartChunkSize;
+ };
- ZEN_ASSERT(!m_Bucket.empty());
+ ///////////////////////////////////////////////////////////////////////
+ // FileStorage implementations
- std::string Region = std::string(Settings["region"].AsString());
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_DEFAULT_REGION");
- }
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_REGION");
- }
- if (Region.empty())
- {
- Region = "us-east-1";
- }
- m_Region = std::move(Region);
+ FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath))
+ {
+ MakeSafeAbsolutePathInPlace(m_StoragePath);
+ m_StatePathName = m_StoragePath / "current-state.cbo";
+ m_CASPath = m_StoragePath / "cas";
+ CreateDirectories(m_CASPath);
+ }
- std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
- if (AccessKeyId.empty())
- {
- m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
- }
- else
- {
- m_Credentials.AccessKeyId = std::move(AccessKeyId);
- m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
- m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
- }
- m_TempDir = TempDir;
- m_Client = CreateS3Client();
- }
+ void FileStorage::SaveMetadata(const CbObject& Data)
+ {
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
+ }
- virtual void SaveMetadata(const CbObject& Data)
+ CbObject FileStorage::LoadMetadata()
+ {
+ if (!IsFile(m_StatePathName))
+ {
+ return {};
+ }
+ FileContents Content = ReadFile(m_StatePathName);
+ if (Content.ErrorCode)
+ {
+ ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
+ }
+ IoBuffer Payload = Content.Flatten();
+ CbValidateError Error;
+ CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
+ if (Error != CbValidateError::None)
{
- S3Client& Client = *m_Client;
- BinaryWriter Output;
- SaveCompactBinary(Output, Data);
- IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ }
+ return Result;
+ }
- std::string Key = m_KeyPrefix + "/incremental-state.cbo";
- S3Result Result = Client.PutObject(Key, std::move(Payload));
- if (!Result.IsSuccess())
+ std::vector<IoHash> FileStorage::List()
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
+ std::vector<IoHash> Result;
+ Result.reserve(DirContent.Files.size());
+ for (const std::filesystem::path& Path : DirContent.Files)
+ {
+ IoHash Hash;
+ if (IoHash::TryParse(Path.filename().string(), Hash))
{
- throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ Result.push_back(Hash);
}
}
+ return Result;
+ }
- virtual CbObject LoadMetadata()
- {
- S3Client& Client = *m_Client;
- std::string Key = m_KeyPrefix + "/incremental-state.cbo";
- S3GetObjectResult Result = Client.GetObject(Key);
- if (!Result.IsSuccess())
- {
- if (Result.Error == S3GetObjectResult::NotFoundErrorText)
+ void FileStorage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true});
+ }
+ });
+ }
+
+ void FileStorage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
{
- return {};
+ CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true});
}
- throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
- }
+ });
+ }
+
+ void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
+ {
+ ZEN_UNUSED(Work);
+ ZEN_UNUSED(WorkerPool);
+ DeleteDirectories(m_StoragePath);
+ }
- CbValidateError Error;
- CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
- if (Error != CbValidateError::None)
+ ///////////////////////////////////////////////////////////////////////
+ // S3Storage implementations
+
+ S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize)
+ : m_Client(Client)
+ , m_KeyPrefix(std::move(KeyPrefix))
+ , m_TempDir(std::move(TempDir))
+ , m_MultipartChunkSize(MultipartChunkSize)
+ {
+ }
+
+ void S3Storage::SaveMetadata(const CbObject& Data)
+ {
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3Result Result = m_Client.PutObject(Key, std::move(Payload));
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ }
+ }
+
+ CbObject S3Storage::LoadMetadata()
+ {
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3GetObjectResult Result = m_Client.GetObject(Key);
+ if (!Result.IsSuccess())
+ {
+ if (Result.Error == S3GetObjectResult::NotFoundErrorText)
{
- throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
+ return {};
}
- return Meta;
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
}
- virtual CbObject GetSettings() override
+ CbValidateError Error;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
+ if (Error != CbValidateError::None)
{
- CbObjectWriter Writer;
- Writer << "MultipartChunkSize" << m_MultipartChunkSize;
- return Writer.Save();
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
}
+ return Meta;
+ }
- virtual void ParseSettings(const CbObjectView& Settings)
+ CbObject S3Storage::GetSettings()
+ {
+ CbObjectWriter Writer;
+ Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ return Writer.Save();
+ }
+
+ void S3Storage::ParseSettings(const CbObjectView& Settings)
+ {
+ m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ }
+
+ std::vector<IoHash> S3Storage::List()
+ {
+ std::string CasPrefix = m_KeyPrefix + "/cas/";
+ S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix);
+ if (!Result.IsSuccess())
{
- m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize);
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error);
}
- virtual std::vector<IoHash> List()
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Result.Objects.size());
+ for (const S3ObjectInfo& Obj : Result.Objects)
{
- S3Client& Client = *m_Client;
- std::string Prefix = m_KeyPrefix + "/cas/";
- S3ListObjectsResult Result = Client.ListObjects(Prefix);
- if (!Result.IsSuccess())
+ size_t LastSlash = Obj.Key.rfind('/');
+ if (LastSlash == std::string::npos)
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error);
+ continue;
}
-
- std::vector<IoHash> Hashes;
- Hashes.reserve(Result.Objects.size());
- for (const S3ObjectInfo& Obj : Result.Objects)
+ IoHash Hash;
+ if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
{
- size_t LastSlash = Obj.Key.rfind('/');
- if (LastSlash == std::string::npos)
- {
- continue;
- }
- IoHash Hash;
- if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
- {
- Hashes.push_back(Hash);
- }
+ Hashes.push_back(Hash);
}
- return Hashes;
}
+ return Hashes;
+ }
- virtual void Put(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& SourcePath)
- {
- Work.ScheduleWork(
- WorkerPool,
- [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- S3Client& Client = *m_Client;
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ void S3Storage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObjectMultipart(
+ Key,
+ Size,
+ [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
+ m_MultipartChunkSize);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ else
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObject(Key, File.ReadAll());
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ });
+ }
- if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObjectMultipart(
- Key,
- Size,
- [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
- m_MultipartChunkSize);
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- else
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObject(Key, File.ReadAll());
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- });
- }
+ void S3Storage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- virtual void Get(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& DestinationPath)
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
-
- if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ class WorkData
{
- class WorkData
- {
- public:
- WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
- {
- PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
- }
- ~WorkData() { m_DestFile.Flush(); }
- void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
-
- private:
- BasicFile m_DestFile;
- };
-
- std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
-
- uint64_t Offset = 0;
- while (Offset < Size)
+ public:
+ WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
{
- uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
-
- Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
- Key,
- Offset,
- Offset + ChunkSize - 1,
- Chunk.Error);
- }
-
- Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
- });
- Offset += ChunkSize;
+ PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
}
- }
- else
- {
- Work.ScheduleWork(
- WorkerPool,
- [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
- }
+ ~WorkData() { m_DestFile.Flush(); }
+ void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
- if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
- {
- std::error_code Ec;
- std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (Ec)
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- else
- {
- Chunk.Content.SetDeleteOnClose(false);
- Chunk.Content = {};
- RenameFile(ChunkPath, DestinationPath, Ec);
- if (Ec)
- {
- Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
- Chunk.Content.SetDeleteOnClose(true);
- WriteFile(DestinationPath, Chunk.Content);
- }
- }
- }
- else
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- });
- }
- }
+ private:
+ BasicFile m_DestFile;
+ };
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override
- {
- Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- S3Result Result = m_Client->Touch(Key);
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
- }
- });
- }
+ std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
- virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
- {
- std::string Prefix = m_KeyPrefix + "/";
- S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix);
- if (!ListResult.IsSuccess())
+ uint64_t Offset = 0;
+ while (Offset < Size)
{
- throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error);
- }
- for (const S3ObjectInfo& Obj : ListResult.Objects)
- {
- Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
+ uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
+
+ Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
return;
}
- S3Result DelResult = m_Client->DeleteObject(Key);
- if (!DelResult.IsSuccess())
+ S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
}
+
+ Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
});
+ Offset += ChunkSize;
}
}
-
- private:
- std::unique_ptr<S3Client> CreateS3Client() const
+ else
{
- S3ClientOptions Options;
- Options.BucketName = m_Bucket;
- Options.Region = m_Region;
+ Work.ScheduleWork(WorkerPool,
+ [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ }
- CbObjectView Settings = m_Options["settings"].AsObjectView();
- std::string_view Endpoint = Settings["endpoint"].AsString();
- if (!Endpoint.empty())
- {
- Options.Endpoint = std::string(Endpoint);
- Options.PathStyle = Settings["path-style"].AsBool();
- }
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestinationPath, Ec);
+ if (Ec)
+ {
+ Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
+ Chunk.Content.SetDeleteOnClose(true);
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ }
+ }
+ else
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ });
+ }
+ }
- if (m_CredentialProvider)
+ void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
{
- Options.CredentialProvider = m_CredentialProvider;
+ return;
}
- else
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ S3Result Result = m_Client.Touch(Key);
+ if (!Result.IsSuccess())
{
- Options.Credentials = m_Credentials;
+ throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
}
+ });
+ }
- Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
-
- return std::make_unique<S3Client>(Options);
+ void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
+ {
+ std::string ModulePrefix = m_KeyPrefix + "/";
+ S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error);
}
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Result DelResult = m_Client.DeleteObject(Key);
+ if (!DelResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ }
+ });
+ }
+ }
- static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
-
- std::string m_KeyPrefix;
- CbObject m_Options;
- std::string m_Bucket;
- std::string m_Region;
- SigV4Credentials m_Credentials;
- Ref<ImdsCredentialProvider> m_CredentialProvider;
- std::unique_ptr<S3Client> m_Client;
- std::filesystem::path m_TempDir;
- uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize;
- };
-
-} // namespace hydration_impl
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator: the only HydrationStrategyBase implementation.
+ // Holds a per-module StorageBase and threading context; drives the
+ // hydrate/dehydrate algorithm.
-using namespace hydration_impl;
+ class IncrementalHydrator : public HydrationStrategyBase
+ {
+ public:
+ IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage);
+ virtual ~IncrementalHydrator() override;
-///////////////////////////////////////////////////////////////////////////
+ virtual void Dehydrate(const CbObject& CachedState) override;
+ virtual CbObject Hydrate() override;
+ virtual void Obliterate() override;
-class IncrementalHydrator : public HydrationStrategyBase
-{
-public:
- IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage);
- virtual ~IncrementalHydrator() override;
- virtual void Configure(const HydrationConfig& Config) override;
- virtual void Dehydrate(const CbObject& CachedState) override;
- virtual CbObject Hydrate() override;
- virtual void Obliterate() override;
+ private:
+ struct Entry
+ {
+ std::filesystem::path RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ };
-private:
- struct Entry
- {
- std::filesystem::path RelativePath;
- uint64_t Size;
- uint64_t ModTick;
- IoHash Hash;
+ std::unique_ptr<StorageBase> m_Storage;
+ HydrationConfig m_Config;
+ WorkerThreadPool m_FallbackWorkPool;
+ std::atomic<bool> m_FallbackAbortFlag{false};
+ std::atomic<bool> m_FallbackPauseFlag{false};
+ HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
+ .AbortFlag = &m_FallbackAbortFlag,
+ .PauseFlag = &m_FallbackPauseFlag};
};
- std::unique_ptr<StorageBase> m_Storage;
- HydrationConfig m_Config;
- WorkerThreadPool m_FallbackWorkPool;
- std::atomic<bool> m_FallbackAbortFlag{false};
- std::atomic<bool> m_FallbackPauseFlag{false};
- HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
- .AbortFlag = &m_FallbackAbortFlag,
- .PauseFlag = &m_FallbackPauseFlag};
-};
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator implementations
-IncrementalHydrator::IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0)
-{
-}
-
-IncrementalHydrator::~IncrementalHydrator()
-{
- m_Storage.reset();
-}
-
-void
-IncrementalHydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
- m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options);
- if (Config.Threading)
+ IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage)
+ : m_Storage(std::move(Storage))
+ , m_Config(Config)
+ , m_FallbackWorkPool(0)
{
- m_Threading = *Config.Threading;
+ if (Config.Threading)
+ {
+ m_Threading = *Config.Threading;
+ }
}
-}
-void
-IncrementalHydrator::Dehydrate(const CbObject& CachedState)
-{
- Stopwatch Timer;
+ IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); }
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- try
+ void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
- std::unordered_map<std::string, size_t> StateEntryLookup;
- std::vector<Entry> StateEntries;
- for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- std::filesystem::path RelativePath(EntryView["Path"].AsString());
- uint64_t Size = EntryView["Size"].AsUInt64();
- uint64_t ModTick = EntryView["ModTick"].AsUInt64();
- IoHash Hash = EntryView["Hash"].AsHash();
-
- StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
- StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
- }
+ Stopwatch Timer;
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ try
+ {
+ std::unordered_map<std::string, size_t> StateEntryLookup;
+ std::vector<Entry> StateEntries;
+ for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::filesystem::path RelativePath(EntryView["Path"].AsString());
+ uint64_t Size = EntryView["Size"].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"].AsUInt64();
+ IoHash Hash = EntryView["Hash"].AsHash();
+
+ StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
+ StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
- ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- DirContent.Files.size(),
- NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
- std::vector<Entry> Entries;
- Entries.resize(DirContent.Files.size());
+ ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DirContent.Files.size(),
+ NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
- uint64_t HashedFiles = 0;
- uint64_t HashedBytes = 0;
+ std::vector<Entry> Entries;
+ Entries.resize(DirContent.Files.size());
- std::unordered_set<IoHash> ExistsLookup;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ uint64_t HashedFiles = 0;
+ uint64_t HashedBytes = 0;
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unordered_set<IoHash> ExistsLookup;
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
- if (AbsPath.filename() == "reserve.gc")
- {
- continue;
- }
- const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
- if (*RelativePath.begin() == ".sentry-native")
- {
- continue;
- }
- if (RelativePath == ".lock")
- {
- continue;
- }
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- Entry& CurrentEntry = Entries[TotalFiles];
- CurrentEntry.RelativePath = RelativePath;
- CurrentEntry.Size = DirContent.FileSizes[FileIndex];
- CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
-
- bool FoundHash = false;
- if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- const Entry& StateEntry = StateEntries[KnownIt->second];
- if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+ if (AbsPath.filename() == "reserve.gc")
{
- CurrentEntry.Hash = StateEntry.Hash;
- FoundHash = true;
+ continue;
+ }
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ if (*RelativePath.begin() == ".sentry-native")
+ {
+ continue;
+ }
+ if (RelativePath == ".lock")
+ {
+ continue;
}
- }
-
- if (!FoundHash)
- {
- Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- Entry& CurrentEntry = Entries[EntryIndex];
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = RelativePath;
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
- bool FoundHash = false;
- if (AbsPath.extension().empty())
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
{
- auto It = CurrentEntry.RelativePath.begin();
- if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
- RawHash,
- RawSize);
- if (Compressed)
- {
- // We compose a meta-hash since taking the RawHash might collide with an existing
- // non-compressed file with the same content The collision is unlikely except if the
- // compressed data is zero bytes causing RawHash to be the same as an empty file.
- IoHashStream Hasher;
- Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
- Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
- CurrentEntry.Hash = Hasher.GetHash();
- FoundHash = true;
- }
- }
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
}
+ }
- if (!FoundHash)
- {
- CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
- }
- });
- HashedFiles++;
- HashedBytes += CurrentEntry.Size;
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(*m_Threading.WorkerPool,
+ [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ Entry& CurrentEntry = Entries[EntryIndex];
+
+ bool FoundHash = false;
+ if (AbsPath.extension().empty())
+ {
+ auto It = CurrentEntry.RelativePath.begin();
+ if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
+ RawHash,
+ RawSize);
+ if (Compressed)
+ {
+ // We compose a meta-hash since taking the RawHash might collide with an
+ // existing non-compressed file with the same content The collision is
+ // unlikely except if the compressed data is zero bytes causing RawHash
+ // to be the same as an empty file.
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
+ CurrentEntry.Hash = Hasher.GetHash();
+ FoundHash = true;
+ }
+ }
+ }
+
+ if (!FoundHash)
+ {
+ CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
+ }
+ });
+ HashedFiles++;
+ HashedBytes += CurrentEntry.Size;
+ }
+ TotalFiles++;
+ TotalBytes += CurrentEntry.Size;
}
- TotalFiles++;
- TotalBytes += CurrentEntry.Size;
- }
- std::vector<IoHash> ExistingEntries = m_Storage->List();
- ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
+ std::vector<IoHash> ExistingEntries = m_Storage->List();
+ ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
- Work.Wait();
-
- Entries.resize(TotalFiles);
- }
+ Work.Wait();
- uint64_t UploadedFiles = 0;
- uint64_t UploadedBytes = 0;
- uint64_t TouchedFiles = 0;
- uint64_t TouchedBytes = 0;
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ Entries.resize(TotalFiles);
+ }
- for (const Entry& CurrentEntry : Entries)
+ uint64_t UploadedFiles = 0;
+ uint64_t UploadedBytes = 0;
+ uint64_t TouchedFiles = 0;
+ uint64_t TouchedBytes = 0;
{
- if (!ExistsLookup.contains(CurrentEntry.Hash))
- {
- m_Storage->Put(Work,
- *m_Threading.WorkerPool,
- CurrentEntry.Hash,
- CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
- UploadedFiles++;
- UploadedBytes += CurrentEntry.Size;
- }
- else
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+
+ for (const Entry& CurrentEntry : Entries)
{
- // Refresh the backend's modification time so lifecycle-expiration policies
- // do not evict CAS entries that are still referenced by this module.
- m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
- TouchedFiles++;
- TouchedBytes += CurrentEntry.Size;
+ if (!ExistsLookup.contains(CurrentEntry.Hash))
+ {
+ m_Storage->Put(Work,
+ *m_Threading.WorkerPool,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
+ UploadedFiles++;
+ UploadedBytes += CurrentEntry.Size;
+ }
+ else
+ {
+ // Refresh the backend's modification time so lifecycle-expiration policies
+ // do not evict CAS entries that are still referenced by this module.
+ m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
+ TouchedFiles++;
+ TouchedBytes += CurrentEntry.Size;
+ }
}
- }
- Work.Wait();
- uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
-
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "SourceFolder" << ServerStateDir.generic_string();
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadTimeMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "StorageSettings" << m_Storage->GetSettings();
-
- Meta.BeginArray("Files");
- for (const Entry& CurrentEntry : Entries)
- {
- Meta.BeginObject();
+ Work.Wait();
+ uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SourceFolder" << ServerStateDir.generic_string();
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadTimeMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "StorageSettings" << m_Storage->GetSettings();
+
+ Meta.BeginArray("Files");
+ for (const Entry& CurrentEntry : Entries)
{
- Meta << "Path" << CurrentEntry.RelativePath.generic_string();
- Meta << "Size" << CurrentEntry.Size;
- Meta << "ModTick" << CurrentEntry.ModTick;
- Meta << "Hash" << CurrentEntry.Hash;
+ Meta.BeginObject();
+ {
+ Meta << "Path" << CurrentEntry.RelativePath.generic_string();
+ Meta << "Size" << CurrentEntry.Size;
+ Meta << "ModTick" << CurrentEntry.ModTick;
+ Meta << "Hash" << CurrentEntry.Hash;
+ }
+ Meta.EndObject();
}
- Meta.EndObject();
- }
- Meta.EndArray();
-
- m_Storage->SaveMetadata(Meta.Save());
- }
+ Meta.EndArray();
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
-
- ZEN_INFO(
- "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- HashedFiles,
- NiceBytes(HashedBytes),
- UploadedFiles,
- NiceBytes(UploadedBytes),
- TouchedFiles,
- NiceBytes(TouchedBytes),
- TotalFiles,
- NiceBytes(TotalBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
- }
-}
+ m_Storage->SaveMetadata(Meta.Save());
+ }
-CbObject
-IncrementalHydrator::Hydrate()
-{
- Stopwatch Timer;
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- try
- {
- CbObject Meta = m_Storage->LoadMetadata();
- if (!Meta)
+ ZEN_INFO(
+ "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) "
+ "in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ HashedFiles,
+ NiceBytes(HashedBytes),
+ UploadedFiles,
+ NiceBytes(UploadedBytes),
+ TouchedFiles,
+ NiceBytes(TouchedBytes),
+ TotalFiles,
+ NiceBytes(TotalBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ catch (const std::exception& Ex)
{
- ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- return CbObject();
+ ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
}
+ }
- std::unordered_map<std::string, size_t> EntryLookup;
- std::vector<Entry> Entries;
- uint64_t TotalSize = 0;
+ CbObject IncrementalHydrator::Hydrate()
+ {
+ Stopwatch Timer;
- for (CbFieldView FieldView : Meta["Files"])
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ try
{
- CbObjectView EntryView = FieldView.AsObjectView();
- if (EntryView)
+ CbObject Meta = m_Storage->LoadMetadata();
+ if (!Meta)
{
- Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
- .Size = EntryView["Size"].AsUInt64(),
- .ModTick = EntryView["ModTick"].AsUInt64(),
- .Hash = EntryView["Hash"].AsHash()};
- TotalSize += NewEntry.Size;
- EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
- Entries.emplace_back(std::move(NewEntry));
+ ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ return CbObject();
}
- }
- ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- Entries.size(),
- NiceBytes(TotalSize));
+ std::unordered_map<std::string, size_t> EntryLookup;
+ std::vector<Entry> Entries;
+ uint64_t TotalSize = 0;
+
+ for (CbFieldView FieldView : Meta["Files"])
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
+ {
+ Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
+ .Size = EntryView["Size"].AsUInt64(),
+ .ModTick = EntryView["ModTick"].AsUInt64(),
+ .Hash = EntryView["Hash"].AsHash()};
+ TotalSize += NewEntry.Size;
+ EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
+ Entries.emplace_back(std::move(NewEntry));
+ }
+ }
- m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ Entries.size(),
+ NiceBytes(TotalSize));
- uint64_t DownloadedBytes = 0;
- uint64_t DownloadedFiles = 0;
+ m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
- {
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ uint64_t DownloadedBytes = 0;
+ uint64_t DownloadedFiles = 0;
- for (const Entry& CurrentEntry : Entries)
{
- std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
- CreateDirectories(Path.parent_path());
- m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
- DownloadedBytes += CurrentEntry.Size;
- DownloadedFiles++;
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (const Entry& CurrentEntry : Entries)
+ {
+ std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
+ CreateDirectories(Path.parent_path());
+ m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
+ DownloadedBytes += CurrentEntry.Size;
+ DownloadedFiles++;
+ }
+
+ Work.Wait();
}
- Work.Wait();
- }
+ // Downloaded successfully - swap into ServerStateDir
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- // Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItTmp != TempDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ TempDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
- if (ItTmp != TempDir.begin())
- {
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ }
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ }
+ }
+
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+
+ // TODO: This could perhaps be done more efficently, but ok for now
DirectoryContent DirContent;
GetDirectoryContent(*m_Threading.WorkerPool,
- TempDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
DirContent);
- for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files");
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
- if (Ec)
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+
+ if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
{
- throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path" << RelativePath.generic_string();
+ HydrateState << "Size" << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash" << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
}
- }
- for (const std::filesystem::path& AbsPath : DirContent.Files)
- {
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
- if (Ec)
+ else
{
- throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ ZEN_ASSERT(false);
}
}
+ HydrateState.EndArray();
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ CbObject StateObject = HydrateState.Save();
+
+ ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DownloadedFiles,
+ NiceBytes(DownloadedBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return StateObject;
}
- else
+ catch (const std::exception& Ex)
{
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ return {};
}
+ }
- // TODO: This could perhaps be done more efficently, but ok for now
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
-
- CbObjectWriter HydrateState;
- HydrateState.BeginArray("Files");
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ void IncrementalHydrator::Obliterate()
+ {
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
- {
- HydrateState.BeginObject();
- {
- HydrateState << "Path" << RelativePath.generic_string();
- HydrateState << "Size" << DirContent.FileSizes[FileIndex];
- HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
- HydrateState << "Hash" << Entries[It->second].Hash;
- }
- HydrateState.EndObject();
- }
- else
- {
- ZEN_ASSERT(false);
- }
+ try
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ m_Storage->Delete(Work, *m_Threading.WorkerPool);
+ Work.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
}
- HydrateState.EndArray();
-
- CbObject StateObject = HydrateState.Save();
-
- ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- DownloadedFiles,
- NiceBytes(DownloadedBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- return StateObject;
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
- return {};
}
-}
-void
-IncrementalHydrator::Obliterate()
+} // namespace hydration_impl
+
+///////////////////////////////////////////////////////////////////////////
+// HydrationBase subclasses - own hub-wide backend state, hand per-module
+// storages the exact inputs they need in CreateHydrator.
+
+class FileHydration : public HydrationBase
{
- const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
- const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+public:
+ explicit FileHydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::filesystem::path m_StorageRoot;
+};
- try
+class S3Hydration : public HydrationBase
+{
+public:
+ explicit S3Hydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::string m_Bucket;
+ std::string m_Region;
+ std::string m_Endpoint;
+ bool m_PathStyle = false;
+ std::string m_KeyPrefixRoot;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+ std::unique_ptr<S3Client> m_Client;
+ uint64_t m_DefaultMultipartChunkSize;
+};
+
+///////////////////////////////////////////////////////////////////////////
+// Implementations
+
+FileHydration::FileHydration(const Configuration& Config)
+{
+ if (!Config.TargetSpecification.empty())
{
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- m_Storage->Delete(Work, *m_Threading.WorkerPool);
- Work.Wait();
+ m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length()));
+ if (m_StorageRoot.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ }
}
- catch (const std::exception& Ex)
+ else
{
- ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ m_StorageRoot = Utf8ToWide(std::string(Path));
}
-
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ MakeSafeAbsolutePathInPlace(m_StorageRoot);
}
std::unique_ptr<HydrationStrategyBase>
-CreateHydrator(const HydrationConfig& Config)
+FileHydration::CreateHydrator(const HydrationConfig& Config)
{
- std::unique_ptr<StorageBase> Storage;
+ using namespace hydration_impl;
+ return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId));
+}
+S3Hydration::S3Hydration(const Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Spec;
if (!Config.TargetSpecification.empty())
{
- if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
- {
- Storage = std::make_unique<FileStorage>();
- }
- else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
- {
- Storage = std::make_unique<S3Storage>();
- }
- else
- {
- throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
- }
+ Spec = Config.TargetSpecification;
+ Spec.remove_prefix(S3Storage::Prefix.size());
}
else
{
- std::string_view Type = Config.Options["type"].AsString();
- if (Type == FileHydratorType)
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
{
- Storage = std::make_unique<FileStorage>();
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
}
- else if (Type == S3HydratorType)
- {
- Storage = std::make_unique<S3Storage>();
- }
- else if (!Type.empty())
+ Spec = Uri;
+ Spec.remove_prefix(S3Storage::Prefix.size());
+ }
+
+ size_t SlashPos = Spec.find('/');
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
+
+ if (m_Bucket.empty())
+ {
+ throw zen::runtime_error("Incremental S3 hydration config requires a bucket name");
+ }
+
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
+
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
+ {
+ m_Endpoint = std::string(Endpoint);
+ m_PathStyle = Settings["path-style"].AsBool();
+ }
+
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+
+ m_DefaultMultipartChunkSize = Settings["chunk-size"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+
+ S3ClientOptions ClientOptions;
+ ClientOptions.BucketName = m_Bucket;
+ ClientOptions.Region = m_Region;
+ ClientOptions.Endpoint = m_Endpoint;
+ ClientOptions.PathStyle = m_PathStyle;
+ if (m_CredentialProvider)
+ {
+ ClientOptions.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ ClientOptions.Credentials = m_Credentials;
+ }
+ ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
+ m_Client = std::make_unique<S3Client>(ClientOptions);
+}
+
+std::unique_ptr<HydrationStrategyBase>
+S3Hydration::CreateHydrator(const HydrationConfig& Config)
+{
+ using namespace hydration_impl;
+ std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId);
+ return std::make_unique<IncrementalHydrator>(
+ Config,
+ std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize));
+}
+
+std::unique_ptr<HydrationBase>
+InitHydration(const HydrationBase::Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ if (!Config.TargetSpecification.empty())
+ {
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0)
{
- throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ return std::make_unique<FileHydration>(Config);
}
- else
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0)
{
- throw zen::runtime_error("No hydration target configured");
+ return std::make_unique<S3Hydration>(Config);
}
+ throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification);
}
- auto Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
- Hydrator->Configure(Config);
- return Hydrator;
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileStorage::Type)
+ {
+ return std::make_unique<FileHydration>(Config);
+ }
+ if (Type == S3Storage::Type)
+ {
+ return std::make_unique<S3Hydration>(Config);
+ }
+ if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ throw zen::runtime_error("No hydration target configured");
}
#if ZEN_WITH_TESTS
@@ -1226,27 +1297,19 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
const std::string ModuleId = "testmodule";
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate: copy server state to file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Verify the module folder exists in the store and ServerStateDir was wiped
CHECK(std::filesystem::exists(HydrationStore / ModuleId));
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: restore server state from file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// Verify restored contents match the original
VerifyTree(ServerStateDir, TestFiles);
@@ -1265,26 +1328,17 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- // Dehydrate the original state
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Put a stale file in ServerStateDir to simulate leftover state
WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
// Hydrate - must wipe stale file and restore original
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
VerifyTree(ServerStateDir, TestFiles);
@@ -1309,23 +1363,15 @@ TEST_CASE("hydration.file.excluded_files_not_dehydrated")
WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule_excl";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Hydrate into a clean directory
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// Normal files must be restored
VerifyTree(ServerStateDir, TestFiles);
@@ -1352,17 +1398,12 @@ TEST_CASE("hydration.file.obliterate")
const std::string ModuleId = "obliterate_test";
CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate so the backend store has data
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CHECK(std::filesystem::exists(HydrationStore / ModuleId));
// Put some files back in ServerStateDir and TempDir to verify cleanup
@@ -1370,10 +1411,7 @@ TEST_CASE("hydration.file.obliterate")
WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
// Obliterate
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Obliterate();
- }
+ Hydration->CreateHydrator(Config)->Obliterate();
// Backend store directory deleted
CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
@@ -1406,6 +1444,8 @@ TEST_CASE("hydration.file.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("file_concurrent_{}", I);
@@ -1414,12 +1454,11 @@ TEST_CASE("hydration.file.concurrent")
CreateDirectories(StateDir);
CreateDirectories(TempPath);
- Modules[I].Config.ServerStateDir = StateDir;
- Modules[I].Config.TempDir = TempPath;
- Modules[I].Config.ModuleId = ModuleId;
- Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string();
- Modules[I].Config.Threading = Threading.Options;
- Modules[I].Files = CreateSmallTestTree(StateDir);
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateSmallTestTree(StateDir);
}
// Concurrent dehydrate
@@ -1431,9 +1470,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1449,9 +1487,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -1490,10 +1527,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_roundtrip";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1501,39 +1535,30 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"};
// Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir
// with a stale file to confirm the cleanup branch wipes it.
WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
CHECK(std::filesystem::is_empty(ServerStateDir));
// v1: dehydrate without a marker file
CreateSmallTestTree(ServerStateDir);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// v2: dehydrate WITH a marker file that only v2 has
CreateSmallTestTree(ServerStateDir);
WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Hydrate must restore v2 (the latest dehydrated state)
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// v2 marker must be present - confirms the second dehydration overwrote the first
CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
@@ -1568,6 +1593,18 @@ TEST_CASE("hydration.s3.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("s3_concurrent_{}", I);
@@ -1580,16 +1617,7 @@ TEST_CASE("hydration.s3.concurrent")
Modules[I].Config.TempDir = TempPath;
Modules[I].Config.ModuleId = ModuleId;
Modules[I].Config.Threading = Threading.Options;
- {
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Modules[I].Config.Options = std::move(Root).AsObject();
- }
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Files = CreateTestTree(StateDir);
}
// Concurrent dehydrate
@@ -1601,9 +1629,8 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1619,10 +1646,9 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
CleanDirectory(Config.ServerStateDir, true);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -1656,10 +1682,7 @@ TEST_CASE("hydration.s3.obliterate")
const std::string ModuleId = "s3test_obliterate";
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1667,15 +1690,15 @@ TEST_CASE("hydration.s3.obliterate")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate to populate backend
CreateSmallTestTree(ServerStateDir);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
auto ListModuleObjects = [&]() {
S3ClientOptions Opts;
@@ -1696,10 +1719,7 @@ TEST_CASE("hydration.s3.obliterate")
WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
// Obliterate
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Obliterate();
- }
+ Hydration->CreateHydrator(Config)->Obliterate();
// Verify S3 objects deleted
CHECK(ListModuleObjects().Objects.empty());
@@ -1731,10 +1751,7 @@ TEST_CASE("hydration.s3.config_overrides")
{
auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_prefix";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson = fmt::format(
R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
@@ -1742,20 +1759,17 @@ TEST_CASE("hydration.s3.config_overrides")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
VerifyTree(ServerStateDir, TestFiles);
}
@@ -1768,10 +1782,7 @@ TEST_CASE("hydration.s3.config_overrides")
ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_region_override";
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson = fmt::format(
R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
@@ -1779,20 +1790,17 @@ TEST_CASE("hydration.s3.config_overrides")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
- }
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
VerifyTree(ServerStateDir, TestFiles);
}
@@ -1822,25 +1830,26 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
TestThreading Threading(4);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.Threading = Threading.Options;
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
-
- // Dehydrate: upload server state to MinIO
+ HydrationBase::Configuration BaseConfig;
{
- ZEN_INFO("============== DEHYDRATE ==============");
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate(CbObject());
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
+
+ // Dehydrate: upload server state to MinIO
+ ZEN_INFO("============== DEHYDRATE ==============");
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
for (size_t I = 0; I < 1; I++)
{
@@ -1849,11 +1858,8 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: download from MinIO back to server state
- {
- ZEN_INFO("=============== HYDRATE ===============");
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ ZEN_INFO("=============== HYDRATE ===============");
+ Hydration->CreateHydrator(Config)->Hydrate();
}
}
@@ -1880,23 +1886,16 @@ TEST_CASE("hydration.file.incremental")
TestThreading Threading(4);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
- Config.Threading = Threading.Options;
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- std::unique_ptr<StorageBase> Storage = std::make_unique<FileStorage>();
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
// Hydrate with no prior state
- CbObject HydrationState;
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- CHECK_FALSE(HydrationState);
- }
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
# ifdef REAL_DATA_PATH
ZEN_INFO("Writing state data...");
@@ -1906,54 +1905,33 @@ TEST_CASE("hydration.file.incremental")
// Create test files and dehydrate
auto TestFiles = CreateTestTree(ServerStateDir);
# endif
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
- // Hydrate: restore from S3
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ // Hydrate: restore from file store
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif
// Dehydrate again with cached state (should skip re-uploading unchanged files)
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
// Replace files and dehydrate
TestFiles = CreateTestTree(ServerStateDir);
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif // 0
// Dehydrate, nothing touched - no hashing, no upload
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
}
// ---------------------------------------------------------------------------
@@ -1986,11 +1964,7 @@ TEST_CASE("hydration.s3.incremental")
TestThreading Threading(8);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.Threading = Threading.Options;
+ HydrationBase::Configuration BaseConfig;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1998,19 +1972,18 @@ TEST_CASE("hydration.s3.incremental")
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- std::unique_ptr<StorageBase> Storage = std::make_unique<S3Storage>();
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
// Hydrate with no prior state
- CbObject HydrationState;
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- CHECK_FALSE(HydrationState);
- }
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
# ifdef REAL_DATA_PATH
ZEN_INFO("Writing state data...");
@@ -2020,83 +1993,51 @@ TEST_CASE("hydration.s3.incremental")
// Create test files and dehydrate
auto TestFiles = CreateTestTree(ServerStateDir);
# endif
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: restore from S3
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif
// Dehydrate again with cached state (should skip re-uploading unchanged files)
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
// Replace files and dehydrate
TestFiles = CreateTestTree(ServerStateDir);
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
// Hydrate one more time to confirm second dehydrate produced valid state
- {
- Hydrator->Configure(Config);
- HydrationState = Hydrator->Hydrate();
- }
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
# endif // 0
// Dehydrate, nothing touched - no hashing, no upload
- {
- Hydrator->Configure(Config);
- Hydrator->Dehydrate(HydrationState);
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
}
TEST_CASE("hydration.create_hydrator_rejects_invalid_config")
{
- ScopedTemporaryDirectory TempDir;
-
- HydrationConfig Config;
- Config.ServerStateDir = TempDir.Path() / "state";
- Config.TempDir = TempDir.Path() / "temp";
- Config.ModuleId = "invalid_test";
-
// Unknown TargetSpecification prefix
- Config.TargetSpecification = "ftp://somewhere";
- CHECK_THROWS(CreateHydrator(Config));
+ CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"}));
// Unknown Options type
- Config.TargetSpecification.clear();
{
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()}));
}
- CHECK_THROWS(CreateHydrator(Config));
// Empty Options (no type field)
- Config.Options = CbObject();
- CHECK_THROWS(CreateHydrator(Config));
+ CHECK_THROWS(InitHydration({}));
}
TEST_SUITE_END();
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index fc2f309b2..0455dda91 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -4,8 +4,11 @@
#include <zencore/compactbinary.h>
+#include <atomic>
#include <filesystem>
+#include <memory>
#include <optional>
+#include <string>
namespace zen {
@@ -19,16 +22,12 @@ struct HydrationConfig
std::filesystem::path TempDir;
// Module ID of the server state being hydrated/dehydrated
std::string ModuleId;
- // Back-end specific target specification (e.g. S3 bucket, file path, etc)
- std::string TargetSpecification;
- // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification)
- CbObject Options;
struct ThreadingOptions
{
- WorkerThreadPool* WorkerPool;
- std::atomic<bool>* AbortFlag;
- std::atomic<bool>* PauseFlag;
+ WorkerThreadPool* WorkerPool = nullptr;
+ std::atomic<bool>* AbortFlag = nullptr;
+ std::atomic<bool>* PauseFlag = nullptr;
};
// External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
@@ -41,15 +40,11 @@ struct HydrationConfig
* An instance of this interface is used to perform hydration OR
* dehydration of server state. It's expected to be used only once
* and not reused.
- *
*/
struct HydrationStrategyBase
{
virtual ~HydrationStrategyBase() = default;
- // Set up the hydration target from Config. Must be called before Hydrate/Dehydrate.
- virtual void Configure(const HydrationConfig& Config) = 0;
-
// Upload server state to the configured target. ServerStateDir is wiped on success.
// On failure, ServerStateDir is left intact.
virtual void Dehydrate(const CbObject& CachedState) = 0;
@@ -62,8 +57,36 @@ struct HydrationStrategyBase
virtual void Obliterate() = 0;
};
-// Create a configured hydrator based on Config. Ready to call Hydrate/Dehydrate immediately.
-std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config);
+/**
+ * @brief Hub-wide hydration backend
+ *
+ * Constructed once per hub via InitHydration. Holds the shared connection / client /
+ * credentials state for the configured backend (e.g. a single S3 client and IMDS
+ * credential provider shared by all modules). CreateHydrator produces a ready-to-use
+ * per-module HydrationStrategyBase that references the shared state - no per-module
+ * backend setup cost.
+ */
+class HydrationBase
+{
+public:
+ struct Configuration
+ {
+ // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
+ std::string TargetSpecification;
+ // Full config object (mutually exclusive with TargetSpecification)
+ CbObject Options;
+ };
+
+ virtual ~HydrationBase() = default;
+
+ // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
+};
+
+// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
+// Throws zen::runtime_error if the config cannot be resolved to a known backend or if
+// backend-specific validation fails.
+std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config);
#if ZEN_WITH_TESTS
void hydration_forcelink();
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index af2c19113..97edc5223 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -11,8 +11,12 @@
namespace zen {
-StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId)
-: m_Config(Config)
+StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId)
+: m_Hydration(Hydration)
+, m_Config(Config)
, m_ModuleId(ModuleId)
, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
{
@@ -156,7 +160,7 @@ StorageServerInstance::ObliterateLocked()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
Hydrator->Obliterate();
}
@@ -204,7 +208,7 @@ StorageServerInstance::Hydrate()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
m_HydrationState = Hydrator->Hydrate();
}
@@ -214,18 +218,14 @@ StorageServerInstance::Dehydrate()
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
Hydrator->Dehydrate(m_HydrationState);
}
HydrationConfig
StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
{
- HydrationConfig Config{.ServerStateDir = m_Config.StateDir,
- .TempDir = m_Config.TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification,
- .Options = m_Config.HydrationOptions};
+ HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId};
if (m_Config.OptionalWorkerPool)
{
Config.Threading.emplace(
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 80f8a5016..c5917afc9 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -29,8 +29,6 @@ public:
uint16_t BasePort;
std::filesystem::path StateDir;
std::filesystem::path TempDir;
- std::string HydrationTargetSpecification;
- CbObject HydrationOptions;
uint32_t HttpThreadCount = 0; // Automatic
int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
@@ -42,7 +40,10 @@ public:
WorkerThreadPool* OptionalWorkerPool = nullptr;
};
- StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId);
+ StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId);
~StorageServerInstance();
inline std::string_view GetModuleId() const { return m_ModuleId; }
@@ -140,6 +141,7 @@ private:
void WakeLocked();
mutable RwLock m_Lock;
+ HydrationBase& m_Hydration;
const Configuration m_Config;
std::string m_ModuleId;
ZenServerInstance m_ServerInstance;