aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-07 16:53:55 +0200
committerGitHub Enterprise <[email protected]>2026-04-07 16:53:55 +0200
commit4d8fae7636ad45900f22253621b9f7d51d0b646e (patch)
tree37fdf97870f216d465b4cb66563c5c366262483d /src/zenserver
parentdisable zencompute in bundle step (diff)
downloadzen-4d8fae7636ad45900f22253621b9f7d51d0b646e.tar.xz
zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.zip
incremental dehydrate (#921)
- Feature: Incremental CAS-based hydration/dehydration replacing the previous full-copy approach - Feature: S3 hydration backend with multipart upload/download support - Feature: Configurable thread pools for hub instance provisioning and hydration `--hub-instance-provision-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation. `--hub-hydration-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation. - Improvement: Hub triggers GC on instance before deprovisioning to compact storage before dehydration - Improvement: GC status now reports pending triggers as running - Improvement: S3 client debug logging gated behind verbose mode to reduce log noise at default verbosity - Improvement: Hub dashboard Resources tile now shows total memory - Improvement: `filesystemutils` moved from `zenremotestore` to `zenutil` for broader reuse - Improvement: Hub uses separate provision and hydration worker pools to avoid deadlocks - Improvement: Hibernate/wake/deprovision on non-existent or already-in-target-state modules are idempotent - Improvement: `ScopedTemporaryDirectory` with empty path now creates a temporary directory instead of asserting
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/frontend/html/pages/hub.js1
-rw-r--r--src/zenserver/hub/hub.cpp432
-rw-r--r--src/zenserver/hub/hub.h10
-rw-r--r--src/zenserver/hub/hydration.cpp1916
-rw-r--r--src/zenserver/hub/hydration.h25
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp22
-rw-r--r--src/zenserver/hub/storageserverinstance.h6
-rw-r--r--src/zenserver/hub/zenhubserver.cpp36
-rw-r--r--src/zenserver/hub/zenhubserver.h25
9 files changed, 1509 insertions, 964 deletions
diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js
index c9652f31e..1b84e46ec 100644
--- a/src/zenserver/frontend/html/pages/hub.js
+++ b/src/zenserver/frontend/html/pages/hub.js
@@ -241,6 +241,7 @@ export class Page extends ZenPage
const right = columns.tag().classify("tile-metrics");
this._metric(right, Friendly.bytes(mem_used), "memory used", true);
+ this._metric(right, Friendly.bytes(machine.memory_total_mib * 1024 * 1024), "memory total");
if (mem_limit > 0) { this._metric(right, Friendly.bytes(mem_limit), "memory limit"); }
if (machine.virtual_memory_total_mib > 0)
{
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 82f4a00ba..b2ebcd16f 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -170,18 +170,19 @@ Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace)
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- WorkerThreadPool* OptionalWorkerPool,
- AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
+Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
-, m_WorkerPool(OptionalWorkerPool)
+, m_WorkerPool(Config.OptionalProvisionWorkerPool)
, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
, m_ActiveInstances(Config.InstanceLimit)
, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
+ ZEN_ASSERT_FORMAT(
+ Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
+ "Provision and hydration worker pools must be distinct to avoid deadlocks");
+
if (!m_Config.HydrationTargetSpecification.empty())
{
m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
@@ -329,7 +330,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
.HydrationOptions = m_HydrationOptions,
.HttpThreadCount = m_Config.InstanceHttpThreadCount,
.CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath},
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -639,11 +641,11 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
try
{
m_WorkerPool->ScheduleWork(
- [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteDeprovision(*Instance, ActiveInstanceIndex);
+ CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState);
}
catch (const std::exception& Ex)
{
@@ -671,20 +673,47 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
}
else
{
- CompleteDeprovision(Instance, ActiveInstanceIndex);
+ CompleteDeprovision(Instance, ActiveInstanceIndex, OldState);
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
try
{
+ {
+ if (OldState == HubInstanceState::Provisioned)
+ {
+ ZEN_INFO("Triggering GC for module {}", ModuleId);
+
+ HttpClient GcClient(fmt::format("http://localhost:{}", Port));
+
+ HttpClient::KeyValueMap Params;
+ Params.Entries.insert({"smallobjects", "true"});
+ Params.Entries.insert({"skipcid", "false"});
+ HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params);
+ Stopwatch Timer;
+ while (Response && Timer.GetElapsedTimeMs() < 5000)
+ {
+ Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Response)
+ {
+ bool Complete = Response.AsObject()["Status"].AsString() != "Running";
+ if (Complete)
+ {
+ break;
+ }
+ Sleep(50);
+ }
+ }
+ }
+ }
Instance.Deprovision();
}
catch (const std::exception& Ex)
@@ -1139,10 +1168,14 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
-
{
RwLock::ExclusiveLockScope _(m_Lock);
+ if (m_ShutdownFlag.load())
+ {
+ return;
+ }
+
auto It = m_InstanceLookup.find(std::string(ModuleId));
if (It == m_InstanceLookup.end())
{
@@ -1526,6 +1559,14 @@ static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::mill
namespace hub_testutils {
+ struct TestHubPools
+ {
+ WorkerThreadPool ProvisionPool;
+ WorkerThreadPool HydrationPool;
+
+ explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {}
+ };
+
ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
{
return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir);
@@ -1534,9 +1575,14 @@ namespace hub_testutils {
std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
Hub::Configuration Config = {},
Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {},
- WorkerThreadPool* WorkerPool = nullptr)
+ TestHubPools* Pools = nullptr)
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback));
+ if (Pools)
+ {
+ Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool;
+ Config.OptionalHydrationWorkerPool = &Pools->HydrationPool;
+ }
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
}
struct CallbackRecord
@@ -1608,14 +1654,32 @@ namespace hub_testutils {
} // namespace hub_testutils
-TEST_CASE("hub.provision_basic")
+TEST_CASE("hub.provision")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+
+ hub_testutils::StateChangeCapture CaptureInstance;
+
+ auto CaptureFunc =
+ [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState);
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+ // Provision
HubProvisionedInstanceInfo Info;
const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
@@ -1632,6 +1696,15 @@ TEST_CASE("hub.provision_basic")
CHECK(ModClient.Get("/health/"));
}
+ // Verify provision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Deprovision
const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
@@ -1641,6 +1714,28 @@ TEST_CASE("hub.provision_basic")
HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
+
+ // Verify deprovision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Verify full transition sequence
+ {
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_EQ(Transitions.size(), 4u);
+ CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
+ CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
+ }
}
TEST_CASE("hub.provision_config")
@@ -1693,92 +1788,6 @@ TEST_CASE("hub.provision_config")
}
}
-TEST_CASE("hub.provision_callbacks")
-{
- ScopedTemporaryDirectory TempDir;
-
- hub_testutils::StateChangeCapture CaptureInstance;
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc());
-
- HubProvisionedInstanceInfo Info;
-
- const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info);
- REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
-
- {
- RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
- REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
- CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module");
- CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
- CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0);
- }
-
- {
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
- CHECK(ModClient.Get("/health/"));
- }
-
- const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module");
- CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
-
- {
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
- }
-
- {
- RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
- REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module");
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
- }
-}
-
-TEST_CASE("hub.provision_callback_sequence")
-{
- ScopedTemporaryDirectory TempDir;
-
- struct TransitionRecord
- {
- HubInstanceState OldState;
- HubInstanceState NewState;
- };
- RwLock CaptureMutex;
- std::vector<TransitionRecord> Transitions;
-
- auto CaptureFunc =
- [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
- ZEN_UNUSED(ModuleId);
- ZEN_UNUSED(Info);
- CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
- };
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
-
- HubProvisionedInstanceInfo Info;
- {
- const Hub::Response R = HubInstance->Provision("seq_module", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
- {
- const Hub::Response R = HubInstance->Deprovision("seq_module");
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- RwLock::SharedLockScope _(CaptureMutex);
- REQUIRE_EQ(Transitions.size(), 4u);
- CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
- CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
- CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
- CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
- CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
- CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
- CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
- CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
-}
-
TEST_CASE("hub.instance_limit")
{
ScopedTemporaryDirectory TempDir;
@@ -1810,54 +1819,7 @@ TEST_CASE("hub.instance_limit")
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
-TEST_CASE("hub.enumerate_modules")
-{
- ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
-
- HubProvisionedInstanceInfo Info;
-
- {
- const Hub::Response R = HubInstance->Provision("enum_a", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
- {
- const Hub::Response R = HubInstance->Provision("enum_b", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- std::vector<std::string> Ids;
- int ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
- Ids.push_back(std::string(ModuleId));
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- CHECK_EQ(Ids.size(), 2u);
- CHECK_EQ(ProvisionedCount, 2);
- const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
- const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
- CHECK(FoundA);
- CHECK(FoundB);
-
- HubInstance->Deprovision("enum_a");
- Ids.clear();
- ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
- Ids.push_back(std::string(ModuleId));
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- REQUIRE_EQ(Ids.size(), 1u);
- CHECK_EQ(Ids[0], "enum_b");
- CHECK_EQ(ProvisionedCount, 1);
-}
-
-TEST_CASE("hub.max_instance_count")
+TEST_CASE("hub.enumerate_and_instance_tracking")
{
ScopedTemporaryDirectory TempDir;
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
@@ -1867,22 +1829,56 @@ TEST_CASE("hub.max_instance_count")
HubProvisionedInstanceInfo Info;
{
- const Hub::Response R = HubInstance->Provision("max_a", Info);
+ const Hub::Response R = HubInstance->Provision("track_a", Info);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
{
- const Hub::Response R = HubInstance->Provision("max_b", Info);
+ const Hub::Response R = HubInstance->Provision("track_b", Info);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
+ // Enumerate both modules
+ {
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ CHECK_EQ(Ids.size(), 2u);
+ CHECK_EQ(ProvisionedCount, 2);
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end());
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end());
+ }
+
const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
- HubInstance->Deprovision("max_a");
+ // Deprovision one - max instance count must not decrease
+ HubInstance->Deprovision("track_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
+
+ // Enumerate after deprovision
+ {
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ REQUIRE_EQ(Ids.size(), 1u);
+ CHECK_EQ(Ids[0], "track_b");
+ CHECK_EQ(ProvisionedCount, 1);
+ }
}
TEST_CASE("hub.concurrent_callbacks")
@@ -2038,6 +2034,11 @@ TEST_CASE("hub.hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
+ // Error cases on non-existent modules (no provision needed)
+ CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+
// Provision
{
const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
@@ -2053,9 +2054,14 @@ TEST_CASE("hub.hibernate_wake")
CHECK(ModClient.Get("/health/"));
}
+ // Double-wake on provisioned module is idempotent
+ CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
// Hibernate
- const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a");
- REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message);
+ {
+ const Hub::Response R = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Hibernated);
const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime;
@@ -2065,9 +2071,14 @@ TEST_CASE("hub.hibernate_wake")
CHECK(!ModClient.Get("/health/"));
}
+ // Double-hibernate on already-hibernated module is idempotent
+ CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
// Wake
- const Hub::Response WakeResult = HubInstance->Wake("hib_a");
- REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message);
+ {
+ const Hub::Response R = HubInstance->Wake("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
CHECK_GE(Info.StateChangeTime, HibernatedTime);
@@ -2086,49 +2097,6 @@ TEST_CASE("hub.hibernate_wake")
}
}
-TEST_CASE("hub.hibernate_wake_errors")
-{
- ScopedTemporaryDirectory TempDir;
- Hub::Configuration Config;
- Config.BasePortNumber = 22700;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- HubProvisionedInstanceInfo ProvInfo;
-
- // Hibernate/wake on a non-existent module - returns NotFound (-> 404)
- CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
- CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
-
- // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent)
- {
- const Hub::Response R = HubInstance->Provision("err_b", ProvInfo);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
- {
- const Hub::Response R = HubInstance->Hibernate("err_b");
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- {
- const Hub::Response HibResp = HubInstance->Hibernate("err_b");
- CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed);
- }
-
- // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent)
- {
- const Hub::Response R = HubInstance->Wake("err_b");
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- {
- const Hub::Response WakeResp = HubInstance->Wake("err_b");
- CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed);
- }
-
- // Deprovision not-found - returns NotFound (-> 404)
- CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
-}
-
TEST_CASE("hub.async_hibernate_wake")
{
ScopedTemporaryDirectory TempDir;
@@ -2136,8 +2104,8 @@ TEST_CASE("hub.async_hibernate_wake")
Hub::Configuration Config;
Config.BasePortNumber = 23000;
- WorkerThreadPool WorkerPool(2, "hub_async_hib_wake");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
@@ -2267,25 +2235,21 @@ TEST_CASE("hub.recover_process_crash")
if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned &&
ModClient.Get("/health/"))
{
- // Recovery must reuse the same port - the instance was never removed from the hub's
- // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort.
CHECK_EQ(InstanceInfo.Port, Info.Port);
Recovered = true;
break;
}
}
- CHECK_MESSAGE(Recovered, "Instance did not recover within timeout");
+ REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// Verify the full crash/recovery callback sequence
{
RwLock::SharedLockScope _(CaptureMutex);
REQUIRE_GE(Transitions.size(), 3u);
- // Find the Provisioned->Crashed transition
const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) {
return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed;
});
REQUIRE_NE(CrashedIt, Transitions.end());
- // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned
const auto RecoveringIt = CrashedIt + 1;
REQUIRE_NE(RecoveringIt, Transitions.end());
CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed);
@@ -2295,44 +2259,6 @@ TEST_CASE("hub.recover_process_crash")
CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering);
CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned);
}
-}
-
-TEST_CASE("hub.recover_process_crash_then_deprovision")
-{
- ScopedTemporaryDirectory TempDir;
-
- // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
- Hub::Configuration Config;
- Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
- Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- HubProvisionedInstanceInfo Info;
- {
- const Hub::Response R = HubInstance->Provision("module_a", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- // Kill the child process, wait for the watchdog to detect and recover the instance.
- HubInstance->TerminateModuleForTesting("module_a");
-
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
- constexpr auto kTimeoutMs = std::chrono::seconds(15);
- const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
-
- bool Recovered = false;
- while (std::chrono::steady_clock::now() < Deadline)
- {
- std::this_thread::sleep_for(kPollIntervalMs);
- Hub::InstanceInfo InstanceInfo;
- if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned)
- {
- Recovered = true;
- break;
- }
- }
- REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// After recovery, deprovision should succeed and a re-provision should work.
{
@@ -2361,8 +2287,8 @@ TEST_CASE("hub.async_provision_concurrent")
Config.BasePortNumber = 22800;
Config.InstanceLimit = kModuleCount;
- WorkerThreadPool WorkerPool(4, "hub_async_concurrent");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(4);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
std::vector<std::string> Reasons(kModuleCount);
@@ -2443,8 +2369,8 @@ TEST_CASE("hub.async_provision_shutdown_waits")
Config.InstanceLimit = kModuleCount;
Config.BasePortNumber = 22900;
- WorkerThreadPool WorkerPool(2, "hub_async_shutdown");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
@@ -2476,8 +2402,8 @@ TEST_CASE("hub.async_provision_rejected")
Config.InstanceLimit = 1;
Config.BasePortNumber = 23100;
- WorkerThreadPool WorkerPool(2, "hub_async_rejected");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
HubProvisionedInstanceInfo Info;
@@ -2565,12 +2491,12 @@ TEST_CASE("hub.instance.inactivity.deprovision")
// Phase 1: immediately after setup all three instances must still be alive.
// No timeout has elapsed yet (only 100ms have passed).
- CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+ CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed");
CHECK_MESSAGE(HubInstance->Find("persistent"),
- "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+ "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
// Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout.
// idle must remain alive - its 2s provisioned timeout has not elapsed yet.
@@ -2594,7 +2520,7 @@ TEST_CASE("hub.instance.inactivity.deprovision")
CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2");
- CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed");
+ CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed");
CHECK_MESSAGE(HubInstance->Find("persistent"),
"persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 8ee9130f6..071b14f35 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -73,6 +73,9 @@ public:
WatchDogConfiguration WatchDog;
ResourceMetrics ResourceLimits;
+
+ WorkerThreadPool* OptionalProvisionWorkerPool = nullptr;
+ WorkerThreadPool* OptionalHydrationWorkerPool = nullptr;
};
typedef std::function<
@@ -81,7 +84,6 @@ public:
Hub(const Configuration& Config,
ZenServerEnvironment&& RunEnvironment,
- WorkerThreadPool* OptionalWorkerPool = nullptr,
AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {});
~Hub();
@@ -278,9 +280,9 @@ private:
size_t ActiveInstanceIndex,
HubInstanceState OldState,
bool IsNewInstance);
- void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex);
- void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
- void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
// Notifications may fire slightly out of sync with the Hub's internal State flag.
// The guarantee is that notifications are sent in the correct order, but the State
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index ed16bfe56..cf36d8646 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -5,21 +5,25 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compress.h>
#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/parallelwork.h>
+#include <zencore/stream.h>
#include <zencore/system.h>
#include <zencore/timer.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
+#include <zenutil/filesystemutils.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <json11.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
+#include <numeric>
+#include <unordered_map>
+#include <unordered_set>
#if ZEN_WITH_TESTS
-# include <zencore/parallelwork.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zencore/thread.h>
@@ -30,7 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-namespace {
+namespace hydration_impl {
/// UTC time decomposed to calendar fields with sub-second milliseconds.
struct UtcTime
@@ -56,609 +60,992 @@ namespace {
}
};
-} // namespace
-
-///////////////////////////////////////////////////////////////////////////
-
-constexpr std::string_view FileHydratorPrefix = "file://";
-constexpr std::string_view FileHydratorType = "file";
-
-struct FileHydrator : public HydrationStrategyBase
-{
- virtual void Configure(const HydrationConfig& Config) override;
- virtual void Hydrate() override;
- virtual void Dehydrate() override;
-
-private:
- HydrationConfig m_Config;
- std::filesystem::path m_StorageModuleRootDir;
-};
-
-void
-FileHydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
-
- std::filesystem::path ConfigPath;
- if (!m_Config.TargetSpecification.empty())
+ std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs)
{
- ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()));
- }
- else
- {
- CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
- std::string_view Path = Settings["path"].AsString();
- if (Path.empty())
+ auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end());
+ std::filesystem::path RelativePath;
+ for (auto I = ItAbs; I != Abs.end(); I++)
{
- throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ RelativePath = RelativePath / *I;
}
- ConfigPath = Utf8ToWide(std::string(Path));
+ return RelativePath;
}
- MakeSafeAbsolutePathInPlace(ConfigPath);
- if (!std::filesystem::exists(ConfigPath))
+ void CleanDirectory(WorkerThreadPool& WorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Path)
{
- throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string()));
+ CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
}
- m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId;
+ class StorageBase
+ {
+ public:
+ virtual ~StorageBase() {}
+
+ virtual void Configure(std::string_view ModuleId,
+ const std::filesystem::path& TempDir,
+ std::string_view TargetSpecification,
+ const CbObject& Options) = 0;
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ virtual CbObject GetSettings() = 0;
+ virtual void ParseSettings(const CbObjectView& Settings) = 0;
+ virtual std::vector<IoHash> List() = 0;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) = 0;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) = 0;
+ };
- CreateDirectories(m_StorageModuleRootDir);
-}
+ constexpr std::string_view FileHydratorPrefix = "file://";
+ constexpr std::string_view FileHydratorType = "file";
-void
-FileHydrator::Hydrate()
-{
- ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+ constexpr std::string_view S3HydratorPrefix = "s3://";
+ constexpr std::string_view S3HydratorType = "s3";
- Stopwatch Timer;
+ class FileStorage : public StorageBase
+ {
+ public:
+ FileStorage() {}
+ virtual void Configure(std::string_view ModuleId,
+ const std::filesystem::path& TempDir,
+ std::string_view TargetSpecification,
+ const CbObject& Options)
+ {
+ ZEN_UNUSED(TempDir);
+ if (!TargetSpecification.empty())
+ {
+ m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length()));
+ if (m_StoragePath.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ }
+ }
+ else
+ {
+ CbObjectView Settings = Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ m_StoragePath = Utf8ToWide(std::string(Path));
+ }
+ m_StoragePath = m_StoragePath / ModuleId;
+ MakeSafeAbsolutePathInPlace(m_StoragePath);
+
+ m_StatePathName = m_StoragePath / "current-state.cbo";
+ m_CASPath = m_StoragePath / "cas";
+ CreateDirectories(m_CASPath);
+ }
+ virtual void SaveMetadata(const CbObject& Data)
+ {
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
+ }
+ virtual CbObject LoadMetadata()
+ {
+ if (!IsFile(m_StatePathName))
+ {
+ return {};
+ }
+ FileContents Content = ReadFile(m_StatePathName);
+ if (Content.ErrorCode)
+ {
+ ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
+ }
+ IoBuffer Payload = Content.Flatten();
+ CbValidateError Error;
+ CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
+ if (Error != CbValidateError::None)
+ {
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ }
+ return Result;
+ }
- // Ensure target is clean
- ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir);
- const bool ForceRemoveReadOnlyFiles = true;
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ virtual CbObject GetSettings() override { return {}; }
+ virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); }
- bool WipeServerState = false;
+ virtual std::vector<IoHash> List()
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
+ std::vector<IoHash> Result;
+ Result.reserve(DirContent.Files.size());
+ for (const std::filesystem::path& Path : DirContent.Files)
+ {
+ IoHash Hash;
+ if (IoHash::TryParse(Path.filename().string(), Hash))
+ {
+ Result.push_back(Hash);
+ }
+ }
+ return Result;
+ }
- try
- {
- ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
- CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true});
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir);
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true});
+ }
+ });
+ }
- // We don't do the clean right here to avoid potentially running into double-throws
- WipeServerState = true;
- }
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true});
+ }
+ });
+ }
- if (WipeServerState)
- {
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
- }
- else
+ private:
+ std::filesystem::path m_StoragePath;
+ std::filesystem::path m_StatePathName;
+ std::filesystem::path m_CASPath;
+ };
+
+ class S3Storage : public StorageBase
{
- ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
-}
+ public:
+ S3Storage() {}
-void
-FileHydrator::Dehydrate()
-{
- ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir);
+ virtual void Configure(std::string_view ModuleId,
+ const std::filesystem::path& TempDir,
+ std::string_view TargetSpecification,
+ const CbObject& Options)
+ {
+ m_Options = Options;
- Stopwatch Timer;
+ CbObjectView Settings = m_Options["settings"].AsObjectView();
+ std::string_view Spec;
+ if (!TargetSpecification.empty())
+ {
+ Spec = TargetSpecification;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
+ else
+ {
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
+ {
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
+ }
+ Spec = Uri;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
- const std::filesystem::path TargetDir = m_StorageModuleRootDir;
+ size_t SlashPos = Spec.find('/');
+ std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId);
- // Ensure target is clean. This could be replaced with an atomic copy at a later date
- // (i.e copy into a temporary directory name and rename it once complete)
+ ZEN_ASSERT(!m_Bucket.empty());
- ZEN_DEBUG("Cleaning storage root '{}'", TargetDir);
- const bool ForceRemoveReadOnlyFiles = true;
- CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
- bool CopySuccess = true;
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+ m_TempDir = TempDir;
+ m_Client = CreateS3Client();
+ }
- try
- {
- ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir);
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir))
+ virtual void SaveMetadata(const CbObject& Data)
{
- if (Entry.path().filename() == ".sentry-native")
+ S3Client& Client = *m_Client;
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3Result Result = Client.PutObject(Key, std::move(Payload));
+ if (!Result.IsSuccess())
{
- continue;
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
}
- std::filesystem::path Dest = TargetDir / Entry.path().filename();
- if (Entry.is_directory())
+ }
+
+ virtual CbObject LoadMetadata()
+ {
+ S3Client& Client = *m_Client;
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3GetObjectResult Result = Client.GetObject(Key);
+ if (!Result.IsSuccess())
{
- CreateDirectories(Dest);
- CopyTree(Entry.path(), Dest, {.EnableClone = true});
+ if (Result.Error == S3GetObjectResult::NotFoundErrorText)
+ {
+ return {};
+ }
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
}
- else
+
+ CbValidateError Error;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
+ if (Error != CbValidateError::None)
{
- CopyFile(Entry.path(), Dest, {.EnableClone = true});
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
}
+ return Meta;
}
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir);
- // We don't do the clean right here to avoid potentially running into double-throws
- CopySuccess = false;
- }
+ virtual CbObject GetSettings() override
+ {
+ CbObjectWriter Writer;
+ Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ return Writer.Save();
+ }
- if (!CopySuccess)
- {
- ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir);
- CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
- }
+ virtual void ParseSettings(const CbObjectView& Settings)
+ {
+ m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize);
+ }
- ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ virtual std::vector<IoHash> List()
+ {
+ S3Client& Client = *m_Client;
+ std::string Prefix = m_KeyPrefix + "/cas/";
+ S3ListObjectsResult Result = Client.ListObjects(Prefix);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error);
+ }
- if (CopySuccess)
- {
- ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
-}
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Result.Objects.size());
+ for (const S3ObjectInfo& Obj : Result.Objects)
+ {
+ size_t LastSlash = Obj.Key.rfind('/');
+ if (LastSlash == std::string::npos)
+ {
+ continue;
+ }
+ IoHash Hash;
+ if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
+ {
+ Hashes.push_back(Hash);
+ }
+ }
+ return Hashes;
+ }
-///////////////////////////////////////////////////////////////////////////
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Client& Client = *m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
-constexpr std::string_view S3HydratorPrefix = "s3://";
-constexpr std::string_view S3HydratorType = "s3";
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObjectMultipart(
+ Key,
+ Size,
+ [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
+ m_MultipartChunkSize);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ else
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObject(Key, File.ReadAll());
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ });
+ }
-struct S3Hydrator : public HydrationStrategyBase
-{
- void Configure(const HydrationConfig& Config) override;
- void Dehydrate() override;
- void Hydrate() override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
-private:
- S3Client CreateS3Client() const;
- std::string BuildTimestampFolderName() const;
- std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const;
-
- HydrationConfig m_Config;
- std::string m_Bucket;
- std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash
- std::string m_Region;
- SigV4Credentials m_Credentials;
- Ref<ImdsCredentialProvider> m_CredentialProvider;
-
- static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024;
-};
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ class WorkData
+ {
+ public:
+ WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
+ {
+ PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
+ }
+ ~WorkData() { m_DestFile.Flush(); }
+ void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
-void
-S3Hydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
+ private:
+ BasicFile m_DestFile;
+ };
- CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
- std::string_view Spec;
- if (!m_Config.TargetSpecification.empty())
- {
- Spec = m_Config.TargetSpecification;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
- else
- {
- std::string_view Uri = Settings["uri"].AsString();
- if (Uri.empty())
+ std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
+
+ uint64_t Offset = 0;
+ while (Offset < Size)
+ {
+ uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
+
+ Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
+ }
+
+ Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
+ });
+ Offset += ChunkSize;
+ }
+ }
+ else
+ {
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ }
+
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestinationPath, Ec);
+ if (Ec)
+ {
+ Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
+ Chunk.Content.SetDeleteOnClose(true);
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ }
+ }
+ else
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ });
+ }
+ }
+
+ private:
+ std::unique_ptr<S3Client> CreateS3Client() const
{
- throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'");
+ S3ClientOptions Options;
+ Options.BucketName = m_Bucket;
+ Options.Region = m_Region;
+
+ CbObjectView Settings = m_Options["settings"].AsObjectView();
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
+ {
+ Options.Endpoint = std::string(Endpoint);
+ Options.PathStyle = Settings["path-style"].AsBool();
+ }
+
+ if (m_CredentialProvider)
+ {
+ Options.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ Options.Credentials = m_Credentials;
+ }
+
+ Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
+ return std::make_unique<S3Client>(Options);
}
- Spec = Uri;
- Spec.remove_prefix(S3HydratorPrefix.size());
- }
- size_t SlashPos = Spec.find('/');
- std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
- m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
- m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId;
+ static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
+
+ std::string m_KeyPrefix;
+ CbObject m_Options;
+ std::string m_Bucket;
+ std::string m_Region;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+ std::unique_ptr<S3Client> m_Client;
+ std::filesystem::path m_TempDir;
+ uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize;
+ };
- ZEN_ASSERT(!m_Bucket.empty());
+} // namespace hydration_impl
- std::string Region = std::string(Settings["region"].AsString());
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_DEFAULT_REGION");
- }
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_REGION");
- }
- if (Region.empty())
- {
- Region = "us-east-1";
- }
- m_Region = std::move(Region);
+using namespace hydration_impl;
- std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
- if (AccessKeyId.empty())
- {
- m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
- }
- else
- {
- m_Credentials.AccessKeyId = std::move(AccessKeyId);
- m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
- m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
- }
-}
+///////////////////////////////////////////////////////////////////////////
-S3Client
-S3Hydrator::CreateS3Client() const
+class IncrementalHydrator : public HydrationStrategyBase
{
- S3ClientOptions Options;
- Options.BucketName = m_Bucket;
- Options.Region = m_Region;
+public:
+ IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage);
+ virtual ~IncrementalHydrator() override;
+ virtual void Configure(const HydrationConfig& Config) override;
+ virtual void Dehydrate(const CbObject& CachedState) override;
+ virtual CbObject Hydrate() override;
- CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
- std::string_view Endpoint = Settings["endpoint"].AsString();
- if (!Endpoint.empty())
- {
- Options.Endpoint = std::string(Endpoint);
- Options.PathStyle = Settings["path-style"].AsBool();
- }
-
- if (m_CredentialProvider)
- {
- Options.CredentialProvider = m_CredentialProvider;
- }
- else
+private:
+ struct Entry
{
- Options.Credentials = m_Credentials;
- }
+ std::filesystem::path RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ };
- Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+ std::unique_ptr<StorageBase> m_Storage;
+ HydrationConfig m_Config;
+ WorkerThreadPool m_FallbackWorkPool;
+ std::atomic<bool> m_FallbackAbortFlag{false};
+ std::atomic<bool> m_FallbackPauseFlag{false};
+ HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
+ .AbortFlag = &m_FallbackAbortFlag,
+ .PauseFlag = &m_FallbackPauseFlag};
+};
- return S3Client(Options);
+IncrementalHydrator::IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0)
+{
}
-std::string
-S3Hydrator::BuildTimestampFolderName() const
+IncrementalHydrator::~IncrementalHydrator()
{
- UtcTime Now = UtcTime::Now();
- return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
+ m_Storage.reset();
}
-std::string
-S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const
+void
+IncrementalHydrator::Configure(const HydrationConfig& Config)
{
- return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string();
+ m_Config = Config;
+ m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options);
+ if (Config.Threading)
+ {
+ m_Threading = *Config.Threading;
+ }
}
void
-S3Hydrator::Dehydrate()
+IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
- ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix);
+ Stopwatch Timer;
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
try
{
- S3Client Client = CreateS3Client();
- std::string FolderName = BuildTimestampFolderName();
- uint64_t TotalBytes = 0;
- uint32_t FileCount = 0;
- Stopwatch Timer;
+ std::unordered_map<std::string, size_t> StateEntryLookup;
+ std::vector<Entry> StateEntries;
+ for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::filesystem::path RelativePath(EntryView["Path"].AsString());
+ uint64_t Size = EntryView["Size"].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"].AsUInt64();
+ IoHash Hash = EntryView["Hash"].AsHash();
+
+ StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
+ StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
DirectoryContent DirContent;
- GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent);
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DirContent.Files.size(),
+ NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+
+ std::vector<Entry> Entries;
+ Entries.resize(DirContent.Files.size());
+
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ uint64_t HashedFiles = 0;
+ uint64_t HashedBytes = 0;
+
+ std::unordered_set<IoHash> ExistsLookup;
- for (const std::filesystem::path& AbsPath : DirContent.Files)
{
- std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir);
- if (RelPath.empty() || *RelPath.begin() == "..")
- {
- throw zen::runtime_error(
- "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - "
- "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)",
- AbsPath.string(),
- m_Config.ServerStateDir.string());
- }
- if (*RelPath.begin() == ".sentry-native")
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- continue;
- }
- std::string Key = MakeObjectKey(FolderName, RelPath);
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+ if (AbsPath.filename() == "reserve.gc")
+ {
+ continue;
+ }
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ if (*RelativePath.begin() == ".sentry-native")
+ {
+ continue;
+ }
+ if (RelativePath == ".lock")
+ {
+ continue;
+ }
- BasicFile File(AbsPath, BasicFile::Mode::kRead);
- uint64_t FileSize = File.FileSize();
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = RelativePath;
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
- S3Result UploadResult = Client.PutObjectMultipart(
- Key,
- FileSize,
- [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); },
- MultipartChunkSize);
- if (!UploadResult.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error);
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ {
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
+ }
+ }
+
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ Entry& CurrentEntry = Entries[EntryIndex];
+
+ bool FoundHash = false;
+ if (AbsPath.extension().empty())
+ {
+ auto It = CurrentEntry.RelativePath.begin();
+ if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
+ RawHash,
+ RawSize);
+ if (Compressed)
+ {
+ // We compose a meta-hash since taking the RawHash might collide with an existing
+ // non-compressed file with the same content The collision is unlikely except if the
+ // compressed data is zero bytes causing RawHash to be the same as an empty file.
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
+ CurrentEntry.Hash = Hasher.GetHash();
+ FoundHash = true;
+ }
+ }
+ }
+
+ if (!FoundHash)
+ {
+ CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
+ }
+ });
+ HashedFiles++;
+ HashedBytes += CurrentEntry.Size;
+ }
+ TotalFiles++;
+ TotalBytes += CurrentEntry.Size;
}
- TotalBytes += FileSize;
- ++FileCount;
+ std::vector<IoHash> ExistingEntries = m_Storage->List();
+ ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
+
+ Work.Wait();
+
+ Entries.resize(TotalFiles);
}
- // Write current-state.json
- uint64_t UploadDurationMs = Timer.GetElapsedTimeMs();
-
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "FolderName" << FolderName;
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadDurationMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "FileCount" << FileCount;
-
- ExtendableStringBuilder<1024> JsonBuilder;
- Meta.Save().ToJson(JsonBuilder);
-
- std::string MetaKey = m_KeyPrefix + "/current-state.json";
- std::string_view JsonText = JsonBuilder.ToView();
- IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size());
- S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf));
- if (!MetaUploadResult.IsSuccess())
+ uint64_t UploadedFiles = 0;
+ uint64_t UploadedBytes = 0;
{
- throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error);
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+
+ for (const Entry& CurrentEntry : Entries)
+ {
+ if (!ExistsLookup.contains(CurrentEntry.Hash))
+ {
+ m_Storage->Put(Work,
+ *m_Threading.WorkerPool,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
+ UploadedFiles++;
+ UploadedBytes += CurrentEntry.Size;
+ }
+ }
+
+ Work.Wait();
+ uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SourceFolder" << ServerStateDir.generic_string();
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadTimeMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "StorageSettings" << m_Storage->GetSettings();
+
+ Meta.BeginArray("Files");
+ for (const Entry& CurrentEntry : Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Path" << CurrentEntry.RelativePath.generic_string();
+ Meta << "Size" << CurrentEntry.Size;
+ Meta << "ModTick" << CurrentEntry.ModTick;
+ Meta << "Hash" << CurrentEntry.Hash;
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+
+ m_Storage->SaveMetadata(Meta.Save());
}
- ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs));
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+
+ ZEN_INFO("Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Total {} ({}) in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ HashedFiles,
+ NiceBytes(HashedBytes),
+ UploadedFiles,
+ NiceBytes(UploadedBytes),
+ TotalFiles,
+ NiceBytes(TotalBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- catch (std::exception& Ex)
+ catch (const std::exception& Ex)
{
- // Any in-progress multipart upload has already been aborted by PutObjectMultipart.
- // current-state.json is only written on success, so the previous S3 state remains valid.
- ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what());
+ ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
}
}
-void
-S3Hydrator::Hydrate()
+CbObject
+IncrementalHydrator::Hydrate()
{
- ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir);
-
- Stopwatch Timer;
- const bool ForceRemoveReadOnlyFiles = true;
-
- // Clean temp dir before starting in case of leftover state from a previous failed hydration
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
-
- bool WipeServerState = false;
+ Stopwatch Timer;
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
try
{
- S3Client Client = CreateS3Client();
- std::string MetaKey = m_KeyPrefix + "/current-state.json";
-
- S3GetObjectResult MetaResult = Client.GetObject(MetaKey);
- if (!MetaResult.IsSuccess())
- {
- if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText)
- {
- ZEN_INFO("No state found in S3 at {}", MetaKey);
-
- ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
- return;
- }
- throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error);
- }
-
- std::string ParseError;
- json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError);
- if (!ParseError.empty())
+ CbObject Meta = m_Storage->LoadMetadata();
+ if (!Meta)
{
- throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError);
+ ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ return CbObject();
}
- std::string FolderName = MetaJson["FolderName"].string_value();
- if (FolderName.empty())
- {
- throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey);
- }
+ std::unordered_map<std::string, size_t> EntryLookup;
+ std::vector<Entry> Entries;
+ uint64_t TotalSize = 0;
- std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/";
- S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix);
- if (!ListResult.IsSuccess())
+ for (CbFieldView FieldView : Meta["Files"])
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error);
- }
-
- for (const S3ObjectInfo& Obj : ListResult.Objects)
- {
- if (!Obj.Key.starts_with(FolderPrefix))
- {
- ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix);
- continue;
- }
-
- std::string RelKey = Obj.Key.substr(FolderPrefix.size());
- if (RelKey.empty())
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
{
- continue;
+ Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
+ .Size = EntryView["Size"].AsUInt64(),
+ .ModTick = EntryView["ModTick"].AsUInt64(),
+ .Hash = EntryView["Hash"].AsHash()};
+ TotalSize += NewEntry.Size;
+ EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
+ Entries.emplace_back(std::move(NewEntry));
}
- std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey));
- CreateDirectories(DestPath.parent_path());
+ }
- if (Obj.Size > MultipartChunkSize)
- {
- BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
- DestFile.SetFileSize(Obj.Size);
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ Entries.size(),
+ NiceBytes(TotalSize));
- BasicFileWriter Writer(DestFile, 64 * 1024);
+ m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
- uint64_t Offset = 0;
- while (Offset < Obj.Size)
- {
- uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset);
- S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
- Obj.Key,
- Offset,
- Offset + ChunkSize - 1,
- Chunk.Error);
- }
+ uint64_t DownloadedBytes = 0;
+ uint64_t DownloadedFiles = 0;
- Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
- Offset += ChunkSize;
- }
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- Writer.Flush();
- }
- else
+ for (const Entry& CurrentEntry : Entries)
{
- S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error);
- }
-
- if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
- {
- std::error_code Ec;
- std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (Ec)
- {
- WriteFile(DestPath, Chunk.Content);
- }
- else
- {
- Chunk.Content.SetDeleteOnClose(false);
- Chunk.Content = {};
- RenameFile(ChunkPath, DestPath, Ec);
- }
- }
- else
- {
- WriteFile(DestPath, Chunk.Content);
- }
+ std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
+ CreateDirectories(Path.parent_path());
+ m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
+ DownloadedBytes += CurrentEntry.Size;
+ DownloadedFiles++;
}
+
+ Work.Wait();
}
// Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
// If the two paths share at least one common component they are on the same drive/volume
// and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] =
- std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end());
- if (ItTmp != m_Config.TempDir.begin())
+ auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItTmp != TempDir.begin())
{
DirectoryContent DirContent;
- GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent);
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ TempDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
for (const std::filesystem::path& AbsPath : DirContent.Directories)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
- RenameDirectory(AbsPath, Dest);
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ }
}
for (const std::filesystem::path& AbsPath : DirContent.Files)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
- RenameFile(AbsPath, Dest);
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ }
}
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
}
else
{
// Slow path: TempDir and ServerStateDir are on different filesystems, so rename
// would fail. Copy the tree instead and clean up the temp files afterwards.
ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true});
+ CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
}
- ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what());
+ // TODO: This could perhaps be done more efficently, but ok for now
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files");
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
- // We don't do the clean right here to avoid potentially running into double-throws
- WipeServerState = true;
- }
+ if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
+ {
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path" << RelativePath.generic_string();
+ HydrateState << "Size" << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash" << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ HydrateState.EndArray();
+
+ CbObject StateObject = HydrateState.Save();
- if (WipeServerState)
+ ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DownloadedFiles,
+ NiceBytes(DownloadedBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return StateObject;
+ }
+ catch (const std::exception& Ex)
{
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ return {};
}
}
std::unique_ptr<HydrationStrategyBase>
CreateHydrator(const HydrationConfig& Config)
{
+ std::unique_ptr<StorageBase> Storage;
+
if (!Config.TargetSpecification.empty())
{
if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
+ Storage = std::make_unique<FileStorage>();
}
- if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
+ Storage = std::make_unique<S3Storage>();
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
}
- throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
- }
-
- std::string_view Type = Config.Options["type"].AsString();
- if (Type == FileHydratorType)
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
- }
- if (Type == S3HydratorType)
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
}
- if (!Type.empty())
+ else
{
- throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileHydratorType)
+ {
+ Storage = std::make_unique<FileStorage>();
+ }
+ else if (Type == S3HydratorType)
+ {
+ Storage = std::make_unique<S3Storage>();
+ }
+ else if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ else
+ {
+ throw zen::runtime_error("No hydration target configured");
+ }
}
- throw zen::runtime_error("No hydration target configured");
+
+ auto Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+ Hydrator->Configure(Config);
+ return Hydrator;
}
#if ZEN_WITH_TESTS
namespace {
+ struct TestThreading
+ {
+ WorkerThreadPool WorkerPool;
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag};
+
+ explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {}
+ };
+
/// Scoped RAII helper to set/restore a single environment variable within a test.
/// Used to configure AWS credentials for each S3 test's MinIO instance
/// without polluting the global environment.
@@ -720,10 +1107,10 @@ namespace {
/// subdir/file_b.bin
/// subdir/nested/file_c.bin
/// Returns a vector of (relative path, content) pairs for later verification.
- std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir)
- {
- std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ typedef std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFileList;
+ TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files)
+ {
auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) {
std::filesystem::path FullPath = BaseDir / RelPath;
CreateDirectories(FullPath.parent_path());
@@ -737,9 +1124,33 @@ namespace {
AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512));
AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512));
AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512));
+
+ return Files;
+ }
+
+ TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir)
+ {
+ TestFileList Files;
+ AddTestFiles(BaseDir, Files);
+ return Files;
+ }
+
+ TestFileList CreateTestTree(const std::filesystem::path& BaseDir)
+ {
+ TestFileList Files;
+ AddTestFiles(BaseDir, Files);
+
+ auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) {
+ std::filesystem::path FullPath = BaseDir / RelPath;
+ CreateDirectories(FullPath.parent_path());
+ WriteFile(FullPath, Content);
+ Files.emplace_back(std::move(RelPath), std::move(Content));
+ };
+
AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u));
AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u));
AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u));
+ AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u));
return Files;
}
@@ -777,7 +1188,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
CreateDirectories(HydrationTemp);
const std::string ModuleId = "testmodule";
- auto TestFiles = CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
@@ -788,7 +1199,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
// Dehydrate: copy server state to file store
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(CbObject());
}
// Verify the module folder exists in the store and ServerStateDir was wiped
@@ -805,7 +1216,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
VerifyTree(ServerStateDir, TestFiles);
}
-TEST_CASE("hydration.file.dehydrate_cleans_server_state")
+TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
{
ScopedTemporaryDirectory TempDir;
@@ -816,7 +1227,7 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
@@ -824,14 +1235,26 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state")
Config.ModuleId = "testmodule";
Config.TargetSpecification = "file://" + HydrationStore.string();
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ // Dehydrate the original state
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
- // FileHydrator::Dehydrate() must wipe ServerStateDir when done
- CHECK(std::filesystem::is_empty(ServerStateDir));
+ // Put a stale file in ServerStateDir to simulate leftover state
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+
+ // Hydrate - must wipe stale file and restore original
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
+ VerifyTree(ServerStateDir, TestFiles);
}
-TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+TEST_CASE("hydration.file.excluded_files_not_dehydrated")
{
ScopedTemporaryDirectory TempDir;
@@ -842,31 +1265,37 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- auto TestFiles = CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
+
+ // Add files that the dehydrator should skip
+ WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64));
+ CreateDirectories(ServerStateDir / ".sentry-native");
+ WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
+ WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule";
+ Config.ModuleId = "testmodule_excl";
Config.TargetSpecification = "file://" + HydrationStore.string();
- // Dehydrate the original state
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(CbObject());
}
- // Put a stale file in ServerStateDir to simulate leftover state
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
-
- // Hydrate - must wipe stale file and restore original
+ // Hydrate into a clean directory
+ CleanDirectory(ServerStateDir, true);
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
Hydrator->Hydrate();
}
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
+ // Normal files must be restored
VerifyTree(ServerStateDir, TestFiles);
+ // Excluded files must NOT be restored
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc"));
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native"));
}
// ---------------------------------------------------------------------------
@@ -883,6 +1312,8 @@ TEST_CASE("hydration.file.concurrent")
std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
CreateDirectories(HydrationStore);
+ TestThreading Threading(8);
+
struct ModuleData
{
HydrationConfig Config;
@@ -902,7 +1333,8 @@ TEST_CASE("hydration.file.concurrent")
Modules[I].Config.TempDir = TempPath;
Modules[I].Config.ModuleId = ModuleId;
Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string();
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateSmallTestTree(StateDir);
}
// Concurrent dehydrate
@@ -916,7 +1348,7 @@ TEST_CASE("hydration.file.concurrent")
{
Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -951,76 +1383,13 @@ TEST_CASE("hydration.file.concurrent")
// ---------------------------------------------------------------------------
// S3Hydrator tests
//
-// Each test case spawns its own local MinIO instance (self-contained, no external setup needed).
+// Each test case spawns a local MinIO instance (self-contained, no external setup needed).
// The MinIO binary must be present in the same directory as the test executable (copied by xmake).
// ---------------------------------------------------------------------------
TEST_CASE("hydration.s3.dehydrate_hydrate")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19010;
- MinioProcess Minio(MinioOpts);
- Minio.SpawnMinioServer();
- Minio.CreateBucket("zen-hydration-test");
-
- ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
- ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
-
- ScopedTemporaryDirectory TempDir;
-
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationTemp);
-
- const std::string ModuleId = "s3test_roundtrip";
- auto TestFiles = CreateTestTree(ServerStateDir);
-
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
-
- // Dehydrate: upload server state to MinIO
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
-
- // Wipe server state
- CleanDirectory(ServerStateDir, true);
- CHECK(std::filesystem::is_empty(ServerStateDir));
-
- // Hydrate: download from MinIO back to server state
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
-
- // Verify restored contents match the original
- VerifyTree(ServerStateDir, TestFiles);
-}
-
-TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
-{
- // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites
- // current-state.json to point at that folder. Old folders are NOT deleted.
- // Hydrate() must read current-state.json to determine which folder to restore from.
- //
- // This test verifies that:
- // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first,
- // confirming that current-state.json was updated between dehydrations.
- // 2. current-state.json is updated to point at the second (latest) folder.
- // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot.
-
- MinioProcessOptions MinioOpts;
MinioOpts.Port = 19011;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
@@ -1036,12 +1405,10 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_folder_select";
-
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
+ Config.ModuleId = "s3test_roundtrip";
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1052,105 +1419,43 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
Config.Options = std::move(Root).AsObject();
}
- // v1: dehydrate without a marker file
- CreateTestTree(ServerStateDir);
+ // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir
+ // with a stale file to confirm the cleanup branch wipes it.
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Hydrate();
}
+ CHECK(std::filesystem::is_empty(ServerStateDir));
- // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later
- // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin).
- Sleep(100);
+ // v1: dehydrate without a marker file
+ CreateSmallTestTree(ServerStateDir);
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
// v2: dehydrate WITH a marker file that only v2 has
- CreateTestTree(ServerStateDir);
+ CreateSmallTestTree(ServerStateDir);
WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(CbObject());
}
- // Hydrate must restore v2 (current-state.json points to the v2 folder)
+ // Hydrate must restore v2 (the latest dehydrated state)
CleanDirectory(ServerStateDir, true);
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
Hydrator->Hydrate();
}
- // v2 marker must be present - confirms current-state.json pointed to the v2 folder
+ // v2 marker must be present - confirms the second dehydration overwrote the first
CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
- // Subdirectory hierarchy must also be intact
CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin"));
CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin"));
}
-TEST_CASE("hydration.s3.module_isolation")
-{
- // Two independent modules dehydrate/hydrate without interfering with each other.
- // Uses VerifyTree with per-module byte content to detect cross-module data mixing.
- MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19012;
- MinioProcess Minio(MinioOpts);
- Minio.SpawnMinioServer();
- Minio.CreateBucket("zen-hydration-test");
-
- ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
- ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
-
- ScopedTemporaryDirectory TempDir;
-
- struct ModuleData
- {
- HydrationConfig Config;
- std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
- };
-
- std::vector<ModuleData> Modules;
- for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"})
- {
- std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
- std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
- CreateDirectories(StateDir);
- CreateDirectories(TempPath);
-
- ModuleData Data;
- Data.Config.ServerStateDir = StateDir;
- Data.Config.TempDir = TempPath;
- Data.Config.ModuleId = ModuleId;
- {
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Data.Config.Options = std::move(Root).AsObject();
- }
- Data.Files = CreateTestTree(StateDir);
-
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config);
- Hydrator->Dehydrate();
-
- Modules.push_back(std::move(Data));
- }
-
- for (ModuleData& Module : Modules)
- {
- CleanDirectory(Module.Config.ServerStateDir, true);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config);
- Hydrator->Hydrate();
-
- // Each module's files must be independently restorable with correct byte content.
- // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ.
- VerifyTree(Module.Config.ServerStateDir, Module.Files);
- }
-}
-
-// ---------------------------------------------------------------------------
-// S3Hydrator concurrent test
-// ---------------------------------------------------------------------------
-
TEST_CASE("hydration.s3.concurrent")
{
// N modules dehydrate and hydrate concurrently against MinIO.
@@ -1167,6 +1472,8 @@ TEST_CASE("hydration.s3.concurrent")
constexpr int kModuleCount = 16;
constexpr int kThreadCount = 4;
+ TestThreading Threading(kThreadCount);
+
ScopedTemporaryDirectory TempDir;
struct ModuleData
@@ -1187,6 +1494,7 @@ TEST_CASE("hydration.s3.concurrent")
Modules[I].Config.ServerStateDir = StateDir;
Modules[I].Config.TempDir = TempPath;
Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
{
std::string ConfigJson =
fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
@@ -1210,7 +1518,7 @@ TEST_CASE("hydration.s3.concurrent")
{
Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1243,17 +1551,10 @@ TEST_CASE("hydration.s3.concurrent")
}
}
-// ---------------------------------------------------------------------------
-// S3Hydrator: no prior state (first-boot path)
-// ---------------------------------------------------------------------------
-
-TEST_CASE("hydration.s3.no_prior_state")
+TEST_CASE("hydration.s3.config_overrides")
{
- // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty.
- // The "No state found in S3" path goes through the error-cleanup branch, which wipes
- // ServerStateDir to ensure no partial or stale content is left for the server to start on.
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19014;
+ MinioOpts.Port = 19015;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -1268,42 +1569,82 @@ TEST_CASE("hydration.s3.no_prior_state")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- // Pre-populate ServerStateDir to confirm the wipe actually runs.
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
-
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_no_prior";
+ // Path prefix: "s3://bucket/some/prefix" stores objects under
+ // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...".
{
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_prefix";
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
+
+ CleanDirectory(ServerStateDir, true);
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ VerifyTree(ServerStateDir, TestFiles);
}
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION.
+ // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options.
+ {
+ CleanDirectory(ServerStateDir, true);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
- // ServerStateDir must be empty: the error path wipes it to prevent a server start
- // against stale or partially-installed content.
- CHECK(std::filesystem::is_empty(ServerStateDir));
-}
+ ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
-// ---------------------------------------------------------------------------
-// S3Hydrator: bucket path prefix in TargetSpecification
-// ---------------------------------------------------------------------------
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_region_override";
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
-TEST_CASE("hydration.s3.path_prefix")
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
+
+ CleanDirectory(ServerStateDir, true);
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ VerifyTree(ServerStateDir, TestFiles);
+ }
+}
+
+TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
{
- // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under
- // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...".
- // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure().
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19015;
+ MinioOpts.Port = 19010;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -1318,88 +1659,287 @@ TEST_CASE("hydration.s3.path_prefix")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir);
+ const std::string ModuleId = "s3test_performance";
+ CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true});
+ // auto TestFiles = CreateTestTree(ServerStateDir);
+
+ TestThreading Threading(4);
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_prefix";
+ Config.ModuleId = ModuleId;
+ Config.Threading = Threading.Options;
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+
+ // Dehydrate: upload server state to MinIO
{
- std::string ConfigJson =
- fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
- Minio.Endpoint());
- std::string ParseError;
- CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
- ZEN_ASSERT(ParseError.empty() && Root.IsObject());
- Config.Options = std::move(Root).AsObject();
+ ZEN_INFO("============== DEHYDRATE ==============");
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
}
+ for (size_t I = 0; I < 1; I++)
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ // Wipe server state
+ CleanDirectory(ServerStateDir, true);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: download from MinIO back to server state
+ {
+ ZEN_INFO("=============== HYDRATE ===============");
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
}
+}
- CleanDirectory(ServerStateDir, true);
+//#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen"
+//#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508"
+
+TEST_CASE("hydration.file.incremental")
+{
+ std::filesystem::path TmpPath;
+# ifdef REAL_DATA_PATH
+ TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub";
+# endif
+ ScopedTemporaryDirectory TempDir(TmpPath);
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "testmodule";
+ // auto TestFiles = CreateTestTree(ServerStateDir);
+
+ TestThreading Threading(4);
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ Config.TargetSpecification = "file://" + HydrationStore.string();
+ Config.Threading = Threading.Options;
+
+ std::unique_ptr<StorageBase> Storage = std::make_unique<FileStorage>();
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+
+ // Hydrate with no prior state
+ CbObject HydrationState;
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ CHECK_FALSE(HydrationState);
}
+# ifdef REAL_DATA_PATH
+ ZEN_INFO("Writing state data...");
+ CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true});
+ ZEN_INFO("Writing state data complete");
+# else
+ // Create test files and dehydrate
+ auto TestFiles = CreateTestTree(ServerStateDir);
+# endif
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: restore from S3
+ {
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ }
+# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
+# endif
+ // Dehydrate again with cached state (should skip re-uploading unchanged files)
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ {
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ }
+
+ // Replace files and dehydrate
+ TestFiles = CreateTestTree(ServerStateDir);
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ {
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ }
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif // 0
+
+ // Dehydrate, nothing touched - no hashing, no upload
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
}
-TEST_CASE("hydration.s3.options_region_override")
-{
- // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var.
- // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options.
+// ---------------------------------------------------------------------------
+// S3Storage test
+// ---------------------------------------------------------------------------
+TEST_CASE("hydration.s3.incremental")
+{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19016;
+ MinioOpts.Port = 19017;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
- ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
- ScopedTemporaryDirectory TempDir;
+ std::filesystem::path TmpPath;
+# ifdef REAL_DATA_PATH
+ TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub";
+# endif
+ ScopedTemporaryDirectory TempDir(TmpPath);
std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- auto TestFiles = CreateTestTree(ServerStateDir);
+ const std::string ModuleId = "s3test_incremental";
+
+ TestThreading Threading(8);
HydrationConfig Config;
Config.ServerStateDir = ServerStateDir;
Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_region_override";
+ Config.ModuleId = ModuleId;
+ Config.Threading = Threading.Options;
{
- std::string ConfigJson = fmt::format(
- R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
- Minio.Endpoint());
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
std::string ParseError;
CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
ZEN_ASSERT(ParseError.empty() && Root.IsObject());
Config.Options = std::move(Root).AsObject();
}
+ std::unique_ptr<StorageBase> Storage = std::make_unique<S3Storage>();
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage));
+
+ // Hydrate with no prior state
+ CbObject HydrationState;
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ CHECK_FALSE(HydrationState);
}
- CleanDirectory(ServerStateDir, true);
+# ifdef REAL_DATA_PATH
+ ZEN_INFO("Writing state data...");
+ CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true});
+ ZEN_INFO("Writing state data complete");
+# else
+ // Create test files and dehydrate
+ auto TestFiles = CreateTestTree(ServerStateDir);
+# endif
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ // Hydrate: restore from S3
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ }
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif
+ // Dehydrate again with cached state (should skip re-uploading unchanged files)
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ {
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
+ }
+
+ // Replace files and dehydrate
+ TestFiles = CreateTestTree(ServerStateDir);
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ {
+ Hydrator->Configure(Config);
+ HydrationState = Hydrator->Hydrate();
}
+# ifndef REAL_DATA_PATH
VerifyTree(ServerStateDir, TestFiles);
+# endif // 0
+
+ // Dehydrate, nothing touched - no hashing, no upload
+ {
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate(HydrationState);
+ }
+}
+
+TEST_CASE("hydration.create_hydrator_rejects_invalid_config")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ HydrationConfig Config;
+ Config.ServerStateDir = TempDir.Path() / "state";
+ Config.TempDir = TempDir.Path() / "temp";
+ Config.ModuleId = "invalid_test";
+
+ // Unknown TargetSpecification prefix
+ Config.TargetSpecification = "ftp://somewhere";
+ CHECK_THROWS(CreateHydrator(Config));
+
+ // Unknown Options type
+ Config.TargetSpecification.clear();
+ {
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+ CHECK_THROWS(CreateHydrator(Config));
+
+ // Empty Options (no type field)
+ Config.Options = CbObject();
+ CHECK_THROWS(CreateHydrator(Config));
}
TEST_SUITE_END();
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index 19a96c248..7edf5d996 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -5,9 +5,12 @@
#include <zencore/compactbinary.h>
#include <filesystem>
+#include <optional>
namespace zen {
+class WorkerThreadPool;
+
struct HydrationConfig
{
// Location of server state to hydrate/dehydrate
@@ -20,6 +23,16 @@ struct HydrationConfig
std::string TargetSpecification;
// Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification)
CbObject Options;
+
+ struct ThreadingOptions
+ {
+ WorkerThreadPool* WorkerPool;
+ std::atomic<bool>* AbortFlag;
+ std::atomic<bool>* PauseFlag;
+ };
+
+ // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
+ std::optional<ThreadingOptions> Threading;
};
/**
@@ -34,11 +47,19 @@ struct HydrationStrategyBase
{
virtual ~HydrationStrategyBase() = default;
- virtual void Dehydrate() = 0;
- virtual void Hydrate() = 0;
+ // Set up the hydration target from Config. Must be called before Hydrate/Dehydrate.
virtual void Configure(const HydrationConfig& Config) = 0;
+
+ // Upload server state to the configured target. ServerStateDir is wiped on success.
+ // On failure, ServerStateDir is left intact.
+ virtual void Dehydrate(const CbObject& CachedState) = 0;
+
+ // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate.
+ // On failure, ServerStateDir is wiped and an empty CbObject is returned.
+ virtual CbObject Hydrate() = 0;
};
+// Create a configured hydrator based on Config. Ready to call Hydrate/Dehydrate immediately.
std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config);
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 0c9354990..6185a7f19 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -154,29 +154,43 @@ StorageServerInstance::WakeLocked()
void
StorageServerInstance::Hydrate()
{
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+
HydrationConfig Config{.ServerStateDir = m_BaseDir,
.TempDir = m_TempDir,
.ModuleId = m_ModuleId,
.TargetSpecification = m_Config.HydrationTargetSpecification,
.Options = m_Config.HydrationOptions};
+ if (m_Config.OptionalWorkerPool)
+ {
+ Config.Threading.emplace(
+ HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag});
+ }
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
-
- Hydrator->Hydrate();
+ m_HydrationState = Hydrator->Hydrate();
}
void
StorageServerInstance::Dehydrate()
{
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+
HydrationConfig Config{.ServerStateDir = m_BaseDir,
.TempDir = m_TempDir,
.ModuleId = m_ModuleId,
.TargetSpecification = m_Config.HydrationTargetSpecification,
.Options = m_Config.HydrationOptions};
+ if (m_Config.OptionalWorkerPool)
+ {
+ Config.Threading.emplace(
+ HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag});
+ }
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
-
- Hydrator->Dehydrate();
+ Hydrator->Dehydrate(m_HydrationState);
}
StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr)
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 1b0078d87..d8bebc48d 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -10,6 +10,8 @@
namespace zen {
+class WorkerThreadPool;
+
/**
* Storage Server Instance
*
@@ -29,6 +31,8 @@ public:
uint32_t HttpThreadCount = 0; // Automatic
int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
+
+ WorkerThreadPool* OptionalWorkerPool = nullptr;
};
StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId);
@@ -135,6 +139,8 @@ private:
std::filesystem::path m_TempDir;
+ CbObject m_HydrationState;
+
#if ZEN_PLATFORM_WINDOWS
JobObject* m_JobObject = nullptr;
#endif
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index d01e5f3f2..b94e04092 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -14,16 +14,17 @@
#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/intmath.h>
#include <zencore/memory/llm.h>
#include <zencore/memory/memorytrace.h>
#include <zencore/memory/tagtrace.h>
#include <zencore/scopeguard.h>
#include <zencore/sentryintegration.h>
#include <zencore/system.h>
+#include <zencore/thread.h>
#include <zencore/windows.h>
#include <zenhttp/httpapiservice.h>
#include <zenutil/service.h>
-#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cxxopts.hpp>
@@ -152,6 +153,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HubInstanceConfigPath),
"<instance config>");
+ const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-provision-threads",
+ fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount)
+ ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)),
+ "<threads>");
+
Options.add_option("hub",
"",
"hub-hydration-target-spec",
@@ -168,6 +179,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HydrationTargetConfigPath),
"<path>");
+ const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-threads",
+ fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)),
+ "<threads>");
+
#if ZEN_PLATFORM_WINDOWS
Options.add_option("hub",
"",
@@ -299,9 +320,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
Options.AddOption("hub.instance.limits.memorylimitpercent"sv,
m_ServerOptions.HubProvisionMemoryLimitPercent,
"hub-provision-memory-limit-percent"sv);
+ Options.AddOption("hub.instance.provisionthreads"sv,
+ m_ServerOptions.HubInstanceProvisionThreadCount,
+ "hub-instance-provision-threads"sv);
Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv);
Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv);
+ Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv);
Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv);
Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv,
@@ -468,6 +493,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
// the main test range.
ZenServerEnvironment::SetBaseChildId(1000);
+ m_ProvisionWorkerPool =
+ std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision");
+ m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration");
+
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
InitializeState(ServerConfig);
@@ -591,7 +620,9 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
.ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
.ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
},
- .ResourceLimits = ResolveLimits(ServerConfig)};
+ .ResourceLimits = ResolveLimits(ServerConfig),
+ .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(),
+ .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()};
if (!ServerConfig.HydrationTargetConfigPath.empty())
{
@@ -624,7 +655,6 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
ServerConfig.HubInstanceHttpClass),
- &GetMediumWorkerPool(EWorkloadType::Background),
Hub::AsyncModuleStateChangeCallbackFunc{
[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId,
const HubProvisionedInstanceInfo& Info,
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index d1add7690..22791a648 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -6,6 +6,7 @@
#include "resourcemetrics.h"
#include "zenserver.h"
+#include <zencore/workthreadpool.h>
#include <zenutil/consul.h>
namespace cxxopts {
@@ -40,16 +41,18 @@ struct ZenHubServerConfig : public ZenServerConfig
std::string InstanceId; // For use in notifications
std::string ConsulEndpoint; // If set, enables Consul service registration
std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty
- uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks
- uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service
- uint16_t HubBasePortNumber = 21000;
- int HubInstanceLimit = 1000;
- bool HubUseJobObject = true;
- std::string HubInstanceHttpClass = "asio";
- uint32_t HubInstanceHttpThreadCount = 0; // Automatic
- int HubInstanceCoreLimit = 0; // Automatic
- std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
- std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks
+ uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service
+ uint16_t HubBasePortNumber = 21000;
+ int HubInstanceLimit = 1000;
+ bool HubUseJobObject = true;
+ std::string HubInstanceHttpClass = "asio";
+ uint32_t HubInstanceHttpThreadCount = 0; // Automatic
+ uint32_t HubInstanceProvisionThreadCount = 0; // Synchronous provisioning
+ uint32_t HubHydrationThreadCount = 0; // Synchronous hydration/dehydration
+ int HubInstanceCoreLimit = 0; // Automatic
+ std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
+ std::string HydrationTargetSpecification; // hydration/dehydration target specification
std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification)
ZenHubWatchdogConfig WatchdogConfig;
uint64_t HubProvisionDiskLimitBytes = 0;
@@ -123,6 +126,8 @@ private:
bool m_DebugOptionForcedCrash = false;
std::unique_ptr<HttpProxyHandler> m_Proxy;
+ std::unique_ptr<WorkerThreadPool> m_ProvisionWorkerPool;
+ std::unique_ptr<WorkerThreadPool> m_HydrationWorkerPool;
std::unique_ptr<Hub> m_Hub;
std::unique_ptr<HttpHubService> m_HubService;