diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-07 16:53:55 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-07 16:53:55 +0200 |
| commit | 4d8fae7636ad45900f22253621b9f7d51d0b646e (patch) | |
| tree | 37fdf97870f216d465b4cb66563c5c366262483d /src/zenserver | |
| parent | disable zencompute in bundle step (diff) | |
| download | zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.tar.xz zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.zip | |
incremental dehydrate (#921)
- Feature: Incremental CAS-based hydration/dehydration replacing the previous full-copy approach
- Feature: S3 hydration backend with multipart upload/download support
- Feature: Configurable thread pools for hub instance provisioning and hydration
`--hub-instance-provision-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation.
`--hub-hydration-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation.
- Improvement: Hub triggers GC on instance before deprovisioning to compact storage before dehydration
- Improvement: GC status now reports pending triggers as running
- Improvement: S3 client debug logging gated behind verbose mode to reduce log noise at default verbosity
- Improvement: Hub dashboard Resources tile now shows total memory
- Improvement: `filesystemutils` moved from `zenremotestore` to `zenutil` for broader reuse
- Improvement: Hub uses separate provision and hydration worker pools to avoid deadlocks
- Improvement: Hibernate/wake/deprovision on non-existent or already-in-target-state modules are idempotent
- Improvement: `ScopedTemporaryDirectory` with empty path now creates a temporary directory instead of asserting
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/frontend/html/pages/hub.js | 1 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 432 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 10 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 1916 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 25 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 22 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 6 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 36 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 25 |
9 files changed, 1509 insertions, 964 deletions
diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js index c9652f31e..1b84e46ec 100644 --- a/src/zenserver/frontend/html/pages/hub.js +++ b/src/zenserver/frontend/html/pages/hub.js @@ -241,6 +241,7 @@ export class Page extends ZenPage const right = columns.tag().classify("tile-metrics"); this._metric(right, Friendly.bytes(mem_used), "memory used", true); + this._metric(right, Friendly.bytes(machine.memory_total_mib * 1024 * 1024), "memory total"); if (mem_limit > 0) { this._metric(right, Friendly.bytes(mem_limit), "memory limit"); } if (machine.virtual_memory_total_mib > 0) { diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 82f4a00ba..b2ebcd16f 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -170,18 +170,19 @@ Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - WorkerThreadPool* OptionalWorkerPool, - AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_WorkerPool(OptionalWorkerPool) +, m_WorkerPool(Config.OptionalProvisionWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { + ZEN_ASSERT_FORMAT( + Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, + "Provision and hydration worker pools must be distinct to avoid deadlocks"); + if (!m_Config.HydrationTargetSpecification.empty()) { m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; @@ -329,7 +330,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) .HydrationOptions = m_HydrationOptions, .HttpThreadCount = m_Config.InstanceHttpThreadCount, .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, + .ConfigPath = m_Config.InstanceConfigPath, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -639,11 +641,11 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI try { m_WorkerPool->ScheduleWork( - [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance, ActiveInstanceIndex); + CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -671,20 +673,47 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI } else { - CompleteDeprovision(Instance, ActiveInstanceIndex); + CompleteDeprovision(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { + { + if (OldState == HubInstanceState::Provisioned) + { + ZEN_INFO("Triggering GC for module {}", ModuleId); + + HttpClient GcClient(fmt::format("http://localhost:{}", Port)); + + HttpClient::KeyValueMap Params; + Params.Entries.insert({"smallobjects", "true"}); + Params.Entries.insert({"skipcid", "false"}); + HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params); + Stopwatch Timer; + while (Response && Timer.GetElapsedTimeMs() < 5000) + { + Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject)); + if (Response) + { + bool Complete = Response.AsObject()["Status"].AsString() != "Running"; + if (Complete) + { + break; + } + Sleep(50); + } + } + } + } Instance.Deprovision(); } catch (const std::exception& Ex) @@ -1139,10 +1168,14 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; - { RwLock::ExclusiveLockScope _(m_Lock); + if (m_ShutdownFlag.load()) + { + return; + } + auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { @@ -1526,6 +1559,14 @@ static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::mill namespace hub_testutils { + struct TestHubPools + { + WorkerThreadPool ProvisionPool; + WorkerThreadPool HydrationPool; + + explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {} + }; + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); @@ -1534,9 +1575,14 @@ namespace hub_testutils { std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, - WorkerThreadPool* WorkerPool = nullptr) + TestHubPools* Pools = nullptr) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); + if (Pools) + { + Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool; + Config.OptionalHydrationWorkerPool = &Pools->HydrationPool; + } + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } struct CallbackRecord @@ -1608,14 +1654,32 @@ namespace hub_testutils { } // namespace hub_testutils -TEST_CASE("hub.provision_basic") +TEST_CASE("hub.provision") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + hub_testutils::StateChangeCapture CaptureInstance; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + // Provision HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); @@ -1632,6 +1696,15 @@ TEST_CASE("hub.provision_basic") CHECK(ModClient.Get("/health/")); } + // Verify provision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + } + + // Deprovision const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); @@ -1641,6 +1714,28 @@ TEST_CASE("hub.provision_basic") HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } + + // Verify deprovision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + } + + // Verify full transition sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); + } } TEST_CASE("hub.provision_config") @@ -1693,92 +1788,6 @@ TEST_CASE("hub.provision_config") } } -TEST_CASE("hub.provision_callbacks") -{ - ScopedTemporaryDirectory TempDir; - - hub_testutils::StateChangeCapture CaptureInstance; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); - - HubProvisionedInstanceInfo Info; - - const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); - REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); - CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); - } - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(ModClient.Get("/health/")); - } - - const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); - CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(!ModClient.Get("/health/")); - } - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - } -} - -TEST_CASE("hub.provision_callback_sequence") -{ - ScopedTemporaryDirectory TempDir; - - struct TransitionRecord - { - HubInstanceState OldState; - HubInstanceState NewState; - }; - RwLock CaptureMutex; - std::vector<TransitionRecord> Transitions; - - auto CaptureFunc = - [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { - ZEN_UNUSED(ModuleId); - ZEN_UNUSED(Info); - CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); - }; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("seq_module", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Deprovision("seq_module"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - RwLock::SharedLockScope _(CaptureMutex); - REQUIRE_EQ(Transitions.size(), 4u); - CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); - CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); -} - TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; @@ -1810,54 +1819,7 @@ TEST_CASE("hub.instance_limit") CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.enumerate_modules") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - HubProvisionedInstanceInfo Info; - - { - const Hub::Response R = HubInstance->Provision("enum_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Provision("enum_b", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - std::vector<std::string> Ids; - int ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(Ids.size(), 2u); - CHECK_EQ(ProvisionedCount, 2); - const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); - const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); - CHECK(FoundA); - CHECK(FoundB); - - HubInstance->Deprovision("enum_a"); - Ids.clear(); - ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - REQUIRE_EQ(Ids.size(), 1u); - CHECK_EQ(Ids[0], "enum_b"); - CHECK_EQ(ProvisionedCount, 1); -} - -TEST_CASE("hub.max_instance_count") +TEST_CASE("hub.enumerate_and_instance_tracking") { ScopedTemporaryDirectory TempDir; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); @@ -1867,22 +1829,56 @@ TEST_CASE("hub.max_instance_count") HubProvisionedInstanceInfo Info; { - const Hub::Response R = HubInstance->Provision("max_a", Info); + const Hub::Response R = HubInstance->Provision("track_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); { - const Hub::Response R = HubInstance->Provision("max_b", Info); + const Hub::Response R = HubInstance->Provision("track_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + // Enumerate both modules + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); + CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end()); + CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end()); + } + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - HubInstance->Deprovision("max_a"); + // Deprovision one - max instance count must not decrease + HubInstance->Deprovision("track_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); + + // Enumerate after deprovision + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "track_b"); + CHECK_EQ(ProvisionedCount, 1); + } } TEST_CASE("hub.concurrent_callbacks") @@ -2038,6 +2034,11 @@ TEST_CASE("hub.hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; + // Error cases on non-existent modules (no provision needed) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Provision { const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); @@ -2053,9 +2054,14 @@ TEST_CASE("hub.hibernate_wake") CHECK(ModClient.Get("/health/")); } + // Double-wake on provisioned module is idempotent + CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Hibernate - const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); - REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); + { + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime; @@ -2065,9 +2071,14 @@ TEST_CASE("hub.hibernate_wake") CHECK(!ModClient.Get("/health/")); } + // Double-hibernate on already-hibernated module is idempotent + CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Wake - const Hub::Response WakeResult = HubInstance->Wake("hib_a"); - REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); + { + const Hub::Response R = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); CHECK_GE(Info.StateChangeTime, HibernatedTime); @@ -2086,49 +2097,6 @@ TEST_CASE("hub.hibernate_wake") } } -TEST_CASE("hub.hibernate_wake_errors") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22700; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo ProvInfo; - - // Hibernate/wake on a non-existent module - returns NotFound (-> 404) - CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - - // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) - { - const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Hibernate("err_b"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - { - const Hub::Response HibResp = HubInstance->Hibernate("err_b"); - CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); - } - - // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) - { - const Hub::Response R = HubInstance->Wake("err_b"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - { - const Hub::Response WakeResp = HubInstance->Wake("err_b"); - CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); - } - - // Deprovision not-found - returns NotFound (-> 404) - CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); -} - TEST_CASE("hub.async_hibernate_wake") { ScopedTemporaryDirectory TempDir; @@ -2136,8 +2104,8 @@ TEST_CASE("hub.async_hibernate_wake") Hub::Configuration Config; Config.BasePortNumber = 23000; - WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; @@ -2267,25 +2235,21 @@ TEST_CASE("hub.recover_process_crash") if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && ModClient.Get("/health/")) { - // Recovery must reuse the same port - the instance was never removed from the hub's - // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. CHECK_EQ(InstanceInfo.Port, Info.Port); Recovered = true; break; } } - CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); + REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // Verify the full crash/recovery callback sequence { RwLock::SharedLockScope _(CaptureMutex); REQUIRE_GE(Transitions.size(), 3u); - // Find the Provisioned->Crashed transition const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; }); REQUIRE_NE(CrashedIt, Transitions.end()); - // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned const auto RecoveringIt = CrashedIt + 1; REQUIRE_NE(RecoveringIt, Transitions.end()); CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); @@ -2295,44 +2259,6 @@ TEST_CASE("hub.recover_process_crash") CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); } -} - -TEST_CASE("hub.recover_process_crash_then_deprovision") -{ - ScopedTemporaryDirectory TempDir; - - // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. - Hub::Configuration Config; - Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); - Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("module_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - // Kill the child process, wait for the watchdog to detect and recover the instance. - HubInstance->TerminateModuleForTesting("module_a"); - - constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); - constexpr auto kTimeoutMs = std::chrono::seconds(15); - const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; - - bool Recovered = false; - while (std::chrono::steady_clock::now() < Deadline) - { - std::this_thread::sleep_for(kPollIntervalMs); - Hub::InstanceInfo InstanceInfo; - if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) - { - Recovered = true; - break; - } - } - REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. { @@ -2361,8 +2287,8 @@ TEST_CASE("hub.async_provision_concurrent") Config.BasePortNumber = 22800; Config.InstanceLimit = kModuleCount; - WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(4); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); std::vector<std::string> Reasons(kModuleCount); @@ -2443,8 +2369,8 @@ TEST_CASE("hub.async_provision_shutdown_waits") Config.InstanceLimit = kModuleCount; Config.BasePortNumber = 22900; - WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); @@ -2476,8 +2402,8 @@ TEST_CASE("hub.async_provision_rejected") Config.InstanceLimit = 1; Config.BasePortNumber = 23100; - WorkerThreadPool WorkerPool(2, "hub_async_rejected"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo Info; @@ -2565,12 +2491,12 @@ TEST_CASE("hub.instance.inactivity.deprovision") // Phase 1: immediately after setup all three instances must still be alive. // No timeout has elapsed yet (only 100ms have passed). - CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), - "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout. // idle must remain alive - its 2s provisioned timeout has not elapsed yet. @@ -2594,7 +2520,7 @@ TEST_CASE("hub.instance.inactivity.deprovision") CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2"); - CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed"); + CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 8ee9130f6..071b14f35 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -73,6 +73,9 @@ public: WatchDogConfiguration WatchDog; ResourceMetrics ResourceLimits; + + WorkerThreadPool* OptionalProvisionWorkerPool = nullptr; + WorkerThreadPool* OptionalHydrationWorkerPool = nullptr; }; typedef std::function< @@ -81,7 +84,6 @@ public: Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, - WorkerThreadPool* OptionalWorkerPool = nullptr, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); @@ -278,9 +280,9 @@ private: size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance); - void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); - void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); - void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); // Notifications may fire slightly out of sync with the Hub's internal State flag. // The guarantee is that notifications are sent in the correct order, but the State diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index ed16bfe56..cf36d8646 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -5,21 +5,25 @@ #include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compress.h> #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> +#include <zencore/stream.h> #include <zencore/system.h> #include <zencore/timer.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> +#include <zenutil/filesystemutils.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <json11.hpp> -ZEN_THIRD_PARTY_INCLUDES_END +#include <numeric> +#include <unordered_map> +#include <unordered_set> #if ZEN_WITH_TESTS -# include <zencore/parallelwork.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zencore/thread.h> @@ -30,7 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -namespace { +namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime @@ -56,609 +60,992 @@ namespace { } }; -} // namespace - -/////////////////////////////////////////////////////////////////////////// - -constexpr std::string_view FileHydratorPrefix = "file://"; -constexpr std::string_view FileHydratorType = "file"; - -struct FileHydrator : public HydrationStrategyBase -{ - virtual void Configure(const HydrationConfig& Config) override; - virtual void Hydrate() override; - virtual void Dehydrate() override; - -private: - HydrationConfig m_Config; - std::filesystem::path m_StorageModuleRootDir; -}; - -void -FileHydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; - - std::filesystem::path ConfigPath; - if (!m_Config.TargetSpecification.empty()) + std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) { - ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length())); - } - else - { - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Path = Settings["path"].AsString(); - if (Path.empty()) + auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); + std::filesystem::path RelativePath; + for (auto I = ItAbs; I != Abs.end(); I++) { - throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + RelativePath = RelativePath / *I; } - ConfigPath = Utf8ToWide(std::string(Path)); + return RelativePath; } - MakeSafeAbsolutePathInPlace(ConfigPath); - if (!std::filesystem::exists(ConfigPath)) + void CleanDirectory(WorkerThreadPool& WorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Path) { - throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); + CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); } - m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; + class StorageBase + { + public: + virtual ~StorageBase() {} + + virtual void Configure(std::string_view ModuleId, + const std::filesystem::path& TempDir, + std::string_view TargetSpecification, + const CbObject& Options) = 0; + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + virtual CbObject GetSettings() = 0; + virtual void ParseSettings(const CbObjectView& Settings) = 0; + virtual std::vector<IoHash> List() = 0; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) = 0; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) = 0; + }; - CreateDirectories(m_StorageModuleRootDir); -} + constexpr std::string_view FileHydratorPrefix = "file://"; + constexpr std::string_view FileHydratorType = "file"; -void -FileHydrator::Hydrate() -{ - ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + constexpr std::string_view S3HydratorPrefix = "s3://"; + constexpr std::string_view S3HydratorType = "s3"; - Stopwatch Timer; + class FileStorage : public StorageBase + { + public: + FileStorage() {} + virtual void Configure(std::string_view ModuleId, + const std::filesystem::path& TempDir, + std::string_view TargetSpecification, + const CbObject& Options) + { + ZEN_UNUSED(TempDir); + if (!TargetSpecification.empty()) + { + m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length())); + if (m_StoragePath.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires a directory path"); + } + } + else + { + CbObjectView Settings = Options["settings"].AsObjectView(); + std::string_view Path = Settings["path"].AsString(); + if (Path.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + } + m_StoragePath = Utf8ToWide(std::string(Path)); + } + m_StoragePath = m_StoragePath / ModuleId; + MakeSafeAbsolutePathInPlace(m_StoragePath); + + m_StatePathName = m_StoragePath / "current-state.cbo"; + m_CASPath = m_StoragePath / "cas"; + CreateDirectories(m_CASPath); + } + virtual void SaveMetadata(const CbObject& Data) + { + BinaryWriter Output; + SaveCompactBinary(Output, Data); + WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); + } + virtual CbObject LoadMetadata() + { + if (!IsFile(m_StatePathName)) + { + return {}; + } + FileContents Content = ReadFile(m_StatePathName); + if (Content.ErrorCode) + { + ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); + } + IoBuffer Payload = Content.Flatten(); + CbValidateError Error; + CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); + if (Error != CbValidateError::None) + { + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + } + return Result; + } - // Ensure target is clean - ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + virtual CbObject GetSettings() override { return {}; } + virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); } - bool WipeServerState = false; + virtual std::vector<IoHash> List() + { + DirectoryContent DirContent; + GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); + std::vector<IoHash> Result; + Result.reserve(DirContent.Files.size()); + for (const std::filesystem::path& Path : DirContent.Files) + { + IoHash Hash; + if (IoHash::TryParse(Path.filename().string(), Hash)) + { + Result.push_back(Hash); + } + } + return Result; + } - try - { - ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); - CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); - } - catch (std::exception& Ex) - { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) + { + CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true}); + } + }); + } - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; - } + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) + { + CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true}); + } + }); + } - if (WipeServerState) - { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - } - else + private: + std::filesystem::path m_StoragePath; + std::filesystem::path m_StatePathName; + std::filesystem::path m_CASPath; + }; + + class S3Storage : public StorageBase { - ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } -} + public: + S3Storage() {} -void -FileHydrator::Dehydrate() -{ - ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + virtual void Configure(std::string_view ModuleId, + const std::filesystem::path& TempDir, + std::string_view TargetSpecification, + const CbObject& Options) + { + m_Options = Options; - Stopwatch Timer; + CbObjectView Settings = m_Options["settings"].AsObjectView(); + std::string_view Spec; + if (!TargetSpecification.empty()) + { + Spec = TargetSpecification; + Spec.remove_prefix(S3HydratorPrefix.size()); + } + else + { + std::string_view Uri = Settings["uri"].AsString(); + if (Uri.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); + } + Spec = Uri; + Spec.remove_prefix(S3HydratorPrefix.size()); + } - const std::filesystem::path TargetDir = m_StorageModuleRootDir; + size_t SlashPos = Spec.find('/'); + std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId); - // Ensure target is clean. This could be replaced with an atomic copy at a later date - // (i.e copy into a temporary directory name and rename it once complete) + ZEN_ASSERT(!m_Bucket.empty()); - ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + std::string Region = std::string(Settings["region"].AsString()); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_DEFAULT_REGION"); + } + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; + } + m_Region = std::move(Region); - bool CopySuccess = true; + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + } + m_TempDir = TempDir; + m_Client = CreateS3Client(); + } - try - { - ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir)) + virtual void SaveMetadata(const CbObject& Data) { - if (Entry.path().filename() == ".sentry-native") + S3Client& Client = *m_Client; + BinaryWriter Output; + SaveCompactBinary(Output, Data); + IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); + + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3Result Result = Client.PutObject(Key, std::move(Payload)); + if (!Result.IsSuccess()) { - continue; + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); } - std::filesystem::path Dest = TargetDir / Entry.path().filename(); - if (Entry.is_directory()) + } + + virtual CbObject LoadMetadata() + { + S3Client& Client = *m_Client; + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3GetObjectResult Result = Client.GetObject(Key); + if (!Result.IsSuccess()) { - CreateDirectories(Dest); - CopyTree(Entry.path(), Dest, {.EnableClone = true}); + if (Result.Error == S3GetObjectResult::NotFoundErrorText) + { + return {}; + } + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); } - else + + CbValidateError Error; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); + if (Error != CbValidateError::None) { - CopyFile(Entry.path(), Dest, {.EnableClone = true}); + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); } + return Meta; } - } - catch (std::exception& Ex) - { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); - // We don't do the clean right here to avoid potentially running into double-throws - CopySuccess = false; - } + virtual CbObject GetSettings() override + { + CbObjectWriter Writer; + Writer << "MultipartChunkSize" << m_MultipartChunkSize; + return Writer.Save(); + } - if (!CopySuccess) - { - ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); - } + virtual void ParseSettings(const CbObjectView& Settings) + { + m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize); + } - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + virtual std::vector<IoHash> List() + { + S3Client& Client = *m_Client; + std::string Prefix = m_KeyPrefix + "/cas/"; + S3ListObjectsResult Result = Client.ListObjects(Prefix); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error); + } - if (CopySuccess) - { - ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } -} + std::vector<IoHash> Hashes; + Hashes.reserve(Result.Objects.size()); + for (const S3ObjectInfo& Obj : Result.Objects) + { + size_t LastSlash = Obj.Key.rfind('/'); + if (LastSlash == std::string::npos) + { + continue; + } + IoHash Hash; + if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) + { + Hashes.push_back(Hash); + } + } + return Hashes; + } -/////////////////////////////////////////////////////////////////////////// + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Client& Client = *m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); -constexpr std::string_view S3HydratorPrefix = "s3://"; -constexpr std::string_view S3HydratorType = "s3"; + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObjectMultipart( + Key, + Size, + [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, + m_MultipartChunkSize); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + else + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObject(Key, File.ReadAll()); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + }); + } -struct S3Hydrator : public HydrationStrategyBase -{ - void Configure(const HydrationConfig& Config) override; - void Dehydrate() override; - void Hydrate() override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); -private: - S3Client CreateS3Client() const; - std::string BuildTimestampFolderName() const; - std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; - - HydrationConfig m_Config; - std::string m_Bucket; - std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash - std::string m_Region; - SigV4Credentials m_Credentials; - Ref<ImdsCredentialProvider> m_CredentialProvider; - - static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024; -}; + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + class WorkData + { + public: + WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) + { + PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); + } + ~WorkData() { m_DestFile.Flush(); } + void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } -void -S3Hydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; + private: + BasicFile m_DestFile; + }; - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Spec; - if (!m_Config.TargetSpecification.empty()) - { - Spec = m_Config.TargetSpecification; - Spec.remove_prefix(S3HydratorPrefix.size()); - } - else - { - std::string_view Uri = Settings["uri"].AsString(); - if (Uri.empty()) + std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size); + + uint64_t Offset = 0; + while (Offset < Size) + { + uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); + + Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); + } + + Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); + }); + Offset += ChunkSize; + } + } + else + { + Work.ScheduleWork( + WorkerPool, + [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + } + + if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + { + std::error_code Ec; + std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); + if (Ec) + { + WriteFile(DestinationPath, Chunk.Content); + } + else + { + Chunk.Content.SetDeleteOnClose(false); + Chunk.Content = {}; + RenameFile(ChunkPath, DestinationPath, Ec); + if (Ec) + { + Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); + Chunk.Content.SetDeleteOnClose(true); + WriteFile(DestinationPath, Chunk.Content); + } + } + } + else + { + WriteFile(DestinationPath, Chunk.Content); + } + }); + } + } + + private: + std::unique_ptr<S3Client> CreateS3Client() const { - throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'"); + S3ClientOptions Options; + Options.BucketName = m_Bucket; + Options.Region = m_Region; + + CbObjectView Settings = m_Options["settings"].AsObjectView(); + std::string_view Endpoint = Settings["endpoint"].AsString(); + if (!Endpoint.empty()) + { + Options.Endpoint = std::string(Endpoint); + Options.PathStyle = Settings["path-style"].AsBool(); + } + + if (m_CredentialProvider) + { + Options.CredentialProvider = m_CredentialProvider; + } + else + { + Options.Credentials = m_Credentials; + } + + Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + + return std::make_unique<S3Client>(Options); } - Spec = Uri; - Spec.remove_prefix(S3HydratorPrefix.size()); - } - size_t SlashPos = Spec.find('/'); - std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; - m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); - m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; + static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; + + std::string m_KeyPrefix; + CbObject m_Options; + std::string m_Bucket; + std::string m_Region; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; + std::unique_ptr<S3Client> m_Client; + std::filesystem::path m_TempDir; + uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize; + }; - ZEN_ASSERT(!m_Bucket.empty()); +} // namespace hydration_impl - std::string Region = std::string(Settings["region"].AsString()); - if (Region.empty()) - { - Region = GetEnvVariable("AWS_DEFAULT_REGION"); - } - if (Region.empty()) - { - Region = GetEnvVariable("AWS_REGION"); - } - if (Region.empty()) - { - Region = "us-east-1"; - } - m_Region = std::move(Region); +using namespace hydration_impl; - std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); - if (AccessKeyId.empty()) - { - m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); - } - else - { - m_Credentials.AccessKeyId = std::move(AccessKeyId); - m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); - m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); - } -} +/////////////////////////////////////////////////////////////////////////// -S3Client -S3Hydrator::CreateS3Client() const +class IncrementalHydrator : public HydrationStrategyBase { - S3ClientOptions Options; - Options.BucketName = m_Bucket; - Options.Region = m_Region; +public: + IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage); + virtual ~IncrementalHydrator() override; + virtual void Configure(const HydrationConfig& Config) override; + virtual void Dehydrate(const CbObject& CachedState) override; + virtual CbObject Hydrate() override; - CbObjectView Settings = m_Config.Options["settings"].AsObjectView(); - std::string_view Endpoint = Settings["endpoint"].AsString(); - if (!Endpoint.empty()) - { - Options.Endpoint = std::string(Endpoint); - Options.PathStyle = Settings["path-style"].AsBool(); - } - - if (m_CredentialProvider) - { - Options.CredentialProvider = m_CredentialProvider; - } - else +private: + struct Entry { - Options.Credentials = m_Credentials; - } + std::filesystem::path RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + }; - Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + std::unique_ptr<StorageBase> m_Storage; + HydrationConfig m_Config; + WorkerThreadPool m_FallbackWorkPool; + std::atomic<bool> m_FallbackAbortFlag{false}; + std::atomic<bool> m_FallbackPauseFlag{false}; + HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, + .AbortFlag = &m_FallbackAbortFlag, + .PauseFlag = &m_FallbackPauseFlag}; +}; - return S3Client(Options); +IncrementalHydrator::IncrementalHydrator(std::unique_ptr<StorageBase>&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0) +{ } -std::string -S3Hydrator::BuildTimestampFolderName() const +IncrementalHydrator::~IncrementalHydrator() { - UtcTime Now = UtcTime::Now(); - return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); + m_Storage.reset(); } -std::string -S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const +void +IncrementalHydrator::Configure(const HydrationConfig& Config) { - return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); + m_Config = Config; + m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options); + if (Config.Threading) + { + m_Threading = *Config.Threading; + } } void -S3Hydrator::Dehydrate() +IncrementalHydrator::Dehydrate(const CbObject& CachedState) { - ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); + Stopwatch Timer; + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { - S3Client Client = CreateS3Client(); - std::string FolderName = BuildTimestampFolderName(); - uint64_t TotalBytes = 0; - uint32_t FileCount = 0; - Stopwatch Timer; + std::unordered_map<std::string, size_t> StateEntryLookup; + std::vector<Entry> StateEntries; + for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) + { + CbObjectView EntryView = FieldView.AsObjectView(); + std::filesystem::path RelativePath(EntryView["Path"].AsString()); + uint64_t Size = EntryView["Size"].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"].AsUInt64(); + IoHash Hash = EntryView["Hash"].AsHash(); + + StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); + StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); + } DirectoryContent DirContent; - GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + DirContent.Files.size(), + NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); + + std::vector<Entry> Entries; + Entries.resize(DirContent.Files.size()); + + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + uint64_t HashedFiles = 0; + uint64_t HashedBytes = 0; + + std::unordered_set<IoHash> ExistsLookup; - for (const std::filesystem::path& AbsPath : DirContent.Files) { - std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir); - if (RelPath.empty() || *RelPath.begin() == "..") - { - throw zen::runtime_error( - "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - " - "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)", - AbsPath.string(), - m_Config.ServerStateDir.string()); - } - if (*RelPath.begin() == ".sentry-native") + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - continue; - } - std::string Key = MakeObjectKey(FolderName, RelPath); + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + if (AbsPath.filename() == "reserve.gc") + { + continue; + } + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + if (*RelativePath.begin() == ".sentry-native") + { + continue; + } + if (RelativePath == ".lock") + { + continue; + } - BasicFile File(AbsPath, BasicFile::Mode::kRead); - uint64_t FileSize = File.FileSize(); + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = RelativePath; + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - S3Result UploadResult = Client.PutObjectMultipart( - Key, - FileSize, - [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }, - MultipartChunkSize); - if (!UploadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + { + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; + } + } + + if (!FoundHash) + { + Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + + Entry& CurrentEntry = Entries[EntryIndex]; + + bool FoundHash = false; + if (AbsPath.extension().empty()) + { + auto It = CurrentEntry.RelativePath.begin(); + if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), + RawHash, + RawSize); + if (Compressed) + { + // We compose a meta-hash since taking the RawHash might collide with an existing + // non-compressed file with the same content The collision is unlikely except if the + // compressed data is zero bytes causing RawHash to be the same as an empty file. + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); + CurrentEntry.Hash = Hasher.GetHash(); + FoundHash = true; + } + } + } + + if (!FoundHash) + { + CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); + } + }); + HashedFiles++; + HashedBytes += CurrentEntry.Size; + } + TotalFiles++; + TotalBytes += CurrentEntry.Size; } - TotalBytes += FileSize; - ++FileCount; + std::vector<IoHash> ExistingEntries = m_Storage->List(); + ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); + + Work.Wait(); + + Entries.resize(TotalFiles); } - // Write current-state.json - uint64_t UploadDurationMs = Timer.GetElapsedTimeMs(); - - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "FolderName" << FolderName; - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadDurationMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "FileCount" << FileCount; - - ExtendableStringBuilder<1024> JsonBuilder; - Meta.Save().ToJson(JsonBuilder); - - std::string MetaKey = m_KeyPrefix + "/current-state.json"; - std::string_view JsonText = JsonBuilder.ToView(); - IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); - S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); - if (!MetaUploadResult.IsSuccess()) + uint64_t UploadedFiles = 0; + uint64_t UploadedBytes = 0; { - throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + + for (const Entry& CurrentEntry : Entries) + { + if (!ExistsLookup.contains(CurrentEntry.Hash)) + { + m_Storage->Put(Work, + *m_Threading.WorkerPool, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); + UploadedFiles++; + UploadedBytes += CurrentEntry.Size; + } + } + + Work.Wait(); + uint64_t UploadTimeMs = Timer.GetElapsedTimeMs(); + + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SourceFolder" << ServerStateDir.generic_string(); + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadTimeMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "StorageSettings" << m_Storage->GetSettings(); + + Meta.BeginArray("Files"); + for (const Entry& CurrentEntry : Entries) + { + Meta.BeginObject(); + { + Meta << "Path" << CurrentEntry.RelativePath.generic_string(); + Meta << "Size" << CurrentEntry.Size; + Meta << "ModTick" << CurrentEntry.ModTick; + Meta << "Hash" << CurrentEntry.Hash; + } + Meta.EndObject(); + } + Meta.EndArray(); + + m_Storage->SaveMetadata(Meta.Save()); } - ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs)); + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + + ZEN_INFO("Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Total {} ({}) in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + HashedFiles, + NiceBytes(HashedBytes), + UploadedFiles, + NiceBytes(UploadedBytes), + TotalFiles, + NiceBytes(TotalBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { - // Any in-progress multipart upload has already been aborted by PutObjectMultipart. - // current-state.json is only written on success, so the previous S3 state remains valid. - ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); + ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); } } -void -S3Hydrator::Hydrate() +CbObject +IncrementalHydrator::Hydrate() { - ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); - - Stopwatch Timer; - const bool ForceRemoveReadOnlyFiles = true; - - // Clean temp dir before starting in case of leftover state from a previous failed hydration - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); - - bool WipeServerState = false; + Stopwatch Timer; + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); try { - S3Client Client = CreateS3Client(); - std::string MetaKey = m_KeyPrefix + "/current-state.json"; - - S3GetObjectResult MetaResult = Client.GetObject(MetaKey); - if (!MetaResult.IsSuccess()) - { - if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText) - { - ZEN_INFO("No state found in S3 at {}", MetaKey); - - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - return; - } - throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); - } - - std::string ParseError; - json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); - if (!ParseError.empty()) + CbObject Meta = m_Storage->LoadMetadata(); + if (!Meta) { - throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); + ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + return CbObject(); } - std::string FolderName = MetaJson["FolderName"].string_value(); - if (FolderName.empty()) - { - throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); - } + std::unordered_map<std::string, size_t> EntryLookup; + std::vector<Entry> Entries; + uint64_t TotalSize = 0; - std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; - S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); - if (!ListResult.IsSuccess()) + for (CbFieldView FieldView : Meta["Files"]) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); - } - - for (const S3ObjectInfo& Obj : ListResult.Objects) - { - if (!Obj.Key.starts_with(FolderPrefix)) - { - ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); - continue; - } - - std::string RelKey = Obj.Key.substr(FolderPrefix.size()); - if (RelKey.empty()) + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) { - continue; + Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), + .Size = EntryView["Size"].AsUInt64(), + .ModTick = EntryView["ModTick"].AsUInt64(), + .Hash = EntryView["Hash"].AsHash()}; + TotalSize += NewEntry.Size; + EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); + Entries.emplace_back(std::move(NewEntry)); } - std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); - CreateDirectories(DestPath.parent_path()); + } - if (Obj.Size > MultipartChunkSize) - { - BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); - DestFile.SetFileSize(Obj.Size); + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + Entries.size(), + NiceBytes(TotalSize)); - BasicFileWriter Writer(DestFile, 64 * 1024); + m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); - uint64_t Offset = 0; - while (Offset < Obj.Size) - { - uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset); - S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); - if (!Chunk.IsSuccess()) - { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", - Obj.Key, - Offset, - Offset + ChunkSize - 1, - Chunk.Error); - } + uint64_t DownloadedBytes = 0; + uint64_t DownloadedFiles = 0; - Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); - Offset += ChunkSize; - } + { + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - Writer.Flush(); - } - else + for (const Entry& CurrentEntry : Entries) { - S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir); - if (!Chunk.IsSuccess()) - { - throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error); - } - - if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) - { - std::error_code Ec; - std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); - if (Ec) - { - WriteFile(DestPath, Chunk.Content); - } - else - { - Chunk.Content.SetDeleteOnClose(false); - Chunk.Content = {}; - RenameFile(ChunkPath, DestPath, Ec); - } - } - else - { - WriteFile(DestPath, Chunk.Content); - } + std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); + CreateDirectories(Path.parent_path()); + m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path); + DownloadedBytes += CurrentEntry.Size; + DownloadedFiles++; } + + Work.Wait(); } // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); // If the two paths share at least one common component they are on the same drive/volume // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = - std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); - if (ItTmp != m_Config.TempDir.begin()) + auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItTmp != TempDir.begin()) { DirectoryContent DirContent; - GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); + GetDirectoryContent(*m_Threading.WorkerPool, + TempDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); for (const std::filesystem::path& AbsPath : DirContent.Directories) { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); - RenameDirectory(AbsPath, Dest); + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + } } for (const std::filesystem::path& AbsPath : DirContent.Files) { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename()); - RenameFile(AbsPath, Dest); + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + } } ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } else { // Slow path: TempDir and ServerStateDir are on different filesystems, so rename // would fail. Copy the tree instead and clean up the temp files afterwards. ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); + CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } - ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - catch (std::exception& Ex) - { - ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); + // TODO: This could perhaps be done more efficently, but ok for now + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; - } + if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) + { + HydrateState.BeginObject(); + { + HydrateState << "Path" << RelativePath.generic_string(); + HydrateState << "Size" << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash" << Entries[It->second].Hash; + } + HydrateState.EndObject(); + } + else + { + ZEN_ASSERT(false); + } + } + HydrateState.EndArray(); + + CbObject StateObject = HydrateState.Save(); - if (WipeServerState) + ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + DownloadedFiles, + NiceBytes(DownloadedBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return StateObject; + } + catch (const std::exception& Ex) { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + return {}; } } std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) { + std::unique_ptr<StorageBase> Storage; + if (!Config.TargetSpecification.empty()) { if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); - Hydrator->Configure(Config); - return Hydrator; + Storage = std::make_unique<FileStorage>(); } - if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); - Hydrator->Configure(Config); - return Hydrator; + Storage = std::make_unique<S3Storage>(); + } + else + { + throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); } - throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); - } - - std::string_view Type = Config.Options["type"].AsString(); - if (Type == FileHydratorType) - { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); - Hydrator->Configure(Config); - return Hydrator; - } - if (Type == S3HydratorType) - { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); - Hydrator->Configure(Config); - return Hydrator; } - if (!Type.empty()) + else { - throw zen::runtime_error("Unknown hydration target type '{}'", Type); + std::string_view Type = Config.Options["type"].AsString(); + if (Type == FileHydratorType) + { + Storage = std::make_unique<FileStorage>(); + } + else if (Type == S3HydratorType) + { + Storage = std::make_unique<S3Storage>(); + } + else if (!Type.empty()) + { + throw zen::runtime_error("Unknown hydration target type '{}'", Type); + } + else + { + throw zen::runtime_error("No hydration target configured"); + } } - throw zen::runtime_error("No hydration target configured"); + + auto Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); + Hydrator->Configure(Config); + return Hydrator; } #if ZEN_WITH_TESTS namespace { + struct TestThreading + { + WorkerThreadPool WorkerPool; + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; + + explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} + }; + /// Scoped RAII helper to set/restore a single environment variable within a test. /// Used to configure AWS credentials for each S3 test's MinIO instance /// without polluting the global environment. @@ -720,10 +1107,10 @@ namespace { /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. - std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir) - { - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + typedef std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFileList; + TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) + { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); @@ -737,9 +1124,33 @@ namespace { AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); + + return Files; + } + + TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + return Files; + } + + TestFileList CreateTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + + auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { + std::filesystem::path FullPath = BaseDir / RelPath; + CreateDirectories(FullPath.parent_path()); + WriteFile(FullPath, Content); + Files.emplace_back(std::move(RelPath), std::move(Content)); + }; + AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); + AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } @@ -777,7 +1188,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate") CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; @@ -788,7 +1199,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate") // Dehydrate: copy server state to file store { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Dehydrate(CbObject()); } // Verify the module folder exists in the store and ServerStateDir was wiped @@ -805,7 +1216,7 @@ TEST_CASE("hydration.file.dehydrate_hydrate") VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.dehydrate_cleans_server_state") +TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; @@ -816,7 +1227,7 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; @@ -824,14 +1235,26 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state") Config.ModuleId = "testmodule"; Config.TargetSpecification = "file://" + HydrationStore.string(); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + // Dehydrate the original state + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } - // FileHydrator::Dehydrate() must wipe ServerStateDir when done - CHECK(std::filesystem::is_empty(ServerStateDir)); + // Put a stale file in ServerStateDir to simulate leftover state + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + + // Hydrate - must wipe stale file and restore original + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Hydrate(); + } + + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +TEST_CASE("hydration.file.excluded_files_not_dehydrated") { ScopedTemporaryDirectory TempDir; @@ -842,31 +1265,37 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + // Add files that the dehydrator should skip + WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); + CreateDirectories(ServerStateDir / ".sentry-native"); + WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); + WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; + Config.ModuleId = "testmodule_excl"; Config.TargetSpecification = "file://" + HydrationStore.string(); - // Dehydrate the original state { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Dehydrate(CbObject()); } - // Put a stale file in ServerStateDir to simulate leftover state - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); - - // Hydrate - must wipe stale file and restore original + // Hydrate into a clean directory + CleanDirectory(ServerStateDir, true); { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); + // Excluded files must NOT be restored + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); + CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); } // --------------------------------------------------------------------------- @@ -883,6 +1312,8 @@ TEST_CASE("hydration.file.concurrent") std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); + TestThreading Threading(8); + struct ModuleData { HydrationConfig Config; @@ -902,7 +1333,8 @@ TEST_CASE("hydration.file.concurrent") Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate @@ -916,7 +1348,7 @@ TEST_CASE("hydration.file.concurrent") { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Dehydrate(CbObject()); }); } Work.Wait(); @@ -951,76 +1383,13 @@ TEST_CASE("hydration.file.concurrent") // --------------------------------------------------------------------------- // S3Hydrator tests // -// Each test case spawns its own local MinIO instance (self-contained, no external setup needed). +// Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19010; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationTemp); - - const std::string ModuleId = "s3test_roundtrip"; - auto TestFiles = CreateTestTree(ServerStateDir); - - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); - - // Dehydrate: upload server state to MinIO - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } - - // Wipe server state - CleanDirectory(ServerStateDir, true); - CHECK(std::filesystem::is_empty(ServerStateDir)); - - // Hydrate: download from MinIO back to server state - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } - - // Verify restored contents match the original - VerifyTree(ServerStateDir, TestFiles); -} - -TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") -{ - // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites - // current-state.json to point at that folder. Old folders are NOT deleted. - // Hydrate() must read current-state.json to determine which folder to restore from. - // - // This test verifies that: - // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first, - // confirming that current-state.json was updated between dehydrations. - // 2. current-state.json is updated to point at the second (latest) folder. - // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot. - - MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); @@ -1036,12 +1405,10 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_folder_select"; - HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; + Config.ModuleId = "s3test_roundtrip"; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1052,105 +1419,43 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") Config.Options = std::move(Root).AsObject(); } - // v1: dehydrate without a marker file - CreateTestTree(ServerStateDir); + // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir + // with a stale file to confirm the cleanup branch wipes it. + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Hydrate(); } + CHECK(std::filesystem::is_empty(ServerStateDir)); - // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later - // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin). - Sleep(100); + // v1: dehydrate without a marker file + CreateSmallTestTree(ServerStateDir); + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } // v2: dehydrate WITH a marker file that only v2 has - CreateTestTree(ServerStateDir); + CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Dehydrate(CbObject()); } - // Hydrate must restore v2 (current-state.json points to the v2 folder) + // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } - // v2 marker must be present - confirms current-state.json pointed to the v2 folder + // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); - // Subdirectory hierarchy must also be intact CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } -TEST_CASE("hydration.s3.module_isolation") -{ - // Two independent modules dehydrate/hydrate without interfering with each other. - // Uses VerifyTree with per-module byte content to detect cross-module data mixing. - MinioProcessOptions MinioOpts; - MinioOpts.Port = 19012; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - struct ModuleData - { - HydrationConfig Config; - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; - }; - - std::vector<ModuleData> Modules; - for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"}) - { - std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; - std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; - CreateDirectories(StateDir); - CreateDirectories(TempPath); - - ModuleData Data; - Data.Config.ServerStateDir = StateDir; - Data.Config.TempDir = TempPath; - Data.Config.ModuleId = ModuleId; - { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Data.Config.Options = std::move(Root).AsObject(); - } - Data.Files = CreateTestTree(StateDir); - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config); - Hydrator->Dehydrate(); - - Modules.push_back(std::move(Data)); - } - - for (ModuleData& Module : Modules) - { - CleanDirectory(Module.Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config); - Hydrator->Hydrate(); - - // Each module's files must be independently restorable with correct byte content. - // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ. - VerifyTree(Module.Config.ServerStateDir, Module.Files); - } -} - -// --------------------------------------------------------------------------- -// S3Hydrator concurrent test -// --------------------------------------------------------------------------- - TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. @@ -1167,6 +1472,8 @@ TEST_CASE("hydration.s3.concurrent") constexpr int kModuleCount = 16; constexpr int kThreadCount = 4; + TestThreading Threading(kThreadCount); + ScopedTemporaryDirectory TempDir; struct ModuleData @@ -1187,6 +1494,7 @@ TEST_CASE("hydration.s3.concurrent") Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", @@ -1210,7 +1518,7 @@ TEST_CASE("hydration.s3.concurrent") { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Dehydrate(CbObject()); }); } Work.Wait(); @@ -1243,17 +1551,10 @@ TEST_CASE("hydration.s3.concurrent") } } -// --------------------------------------------------------------------------- -// S3Hydrator: no prior state (first-boot path) -// --------------------------------------------------------------------------- - -TEST_CASE("hydration.s3.no_prior_state") +TEST_CASE("hydration.s3.config_overrides") { - // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty. - // The "No state found in S3" path goes through the error-cleanup branch, which wipes - // ServerStateDir to ensure no partial or stale content is left for the server to start on. MinioProcessOptions MinioOpts; - MinioOpts.Port = 19014; + MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -1268,42 +1569,82 @@ TEST_CASE("hydration.s3.no_prior_state") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - // Pre-populate ServerStateDir to confirm the wipe actually runs. - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); - - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_no_prior"; + // Path prefix: "s3://bucket/some/prefix" stores objects under + // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = "s3test_prefix"; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } + + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } + + CleanDirectory(ServerStateDir, true); + + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Hydrate(); + } + + VerifyTree(ServerStateDir, TestFiles); } - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. + // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. + { + CleanDirectory(ServerStateDir, true); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - // ServerStateDir must be empty: the error path wipes it to prevent a server start - // against stale or partially-installed content. - CHECK(std::filesystem::is_empty(ServerStateDir)); -} + ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); -// --------------------------------------------------------------------------- -// S3Hydrator: bucket path prefix in TargetSpecification -// --------------------------------------------------------------------------- + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = "s3test_region_override"; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } -TEST_CASE("hydration.s3.path_prefix") + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } + + CleanDirectory(ServerStateDir, true); + + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Hydrate(); + } + + VerifyTree(ServerStateDir, TestFiles); + } +} + +TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) { - // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under - // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". - // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure(). MinioProcessOptions MinioOpts; - MinioOpts.Port = 19015; + MinioOpts.Port = 19010; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -1318,88 +1659,287 @@ TEST_CASE("hydration.s3.path_prefix") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir); + const std::string ModuleId = "s3test_performance"; + CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_prefix"; + Config.ModuleId = ModuleId; + Config.Threading = Threading.Options; + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + + // Dehydrate: upload server state to MinIO { - std::string ConfigJson = - fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", - Minio.Endpoint()); - std::string ParseError; - CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); - ZEN_ASSERT(ParseError.empty() && Root.IsObject()); - Config.Options = std::move(Root).AsObject(); + ZEN_INFO("============== DEHYDRATE =============="); + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); } + for (size_t I = 0; I < 1; I++) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + // Wipe server state + CleanDirectory(ServerStateDir, true); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: download from MinIO back to server state + { + ZEN_INFO("=============== HYDRATE ==============="); + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Hydrate(); + } } +} - CleanDirectory(ServerStateDir, true); +//#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" +//#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" + +TEST_CASE("hydration.file.incremental") +{ + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "testmodule"; + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = ModuleId; + Config.TargetSpecification = "file://" + HydrationStore.string(); + Config.Threading = Threading.Options; + + std::unique_ptr<StorageBase> Storage = std::make_unique<FileStorage>(); + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); + + // Hydrate with no prior state + CbObject HydrationState; { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + CHECK_FALSE(HydrationState); } +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: restore from S3 + { + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + } +# ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate one more time to confirm second dehydrate produced valid state + { + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + } + + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + + // Hydrate one more time to confirm second dehydrate produced valid state + { + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + } +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } } -TEST_CASE("hydration.s3.options_region_override") -{ - // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var. - // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. +// --------------------------------------------------------------------------- +// S3Storage test +// --------------------------------------------------------------------------- +TEST_CASE("hydration.s3.incremental") +{ MinioProcessOptions MinioOpts; - MinioOpts.Port = 19016; + MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); - ScopedTemporaryDirectory TempDir; + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - auto TestFiles = CreateTestTree(ServerStateDir); + const std::string ModuleId = "s3test_incremental"; + + TestThreading Threading(8); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_region_override"; + Config.ModuleId = ModuleId; + Config.Threading = Threading.Options; { - std::string ConfigJson = fmt::format( - R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", - Minio.Endpoint()); + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } + std::unique_ptr<StorageBase> Storage = std::make_unique<S3Storage>(); + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<IncrementalHydrator>(std::move(Storage)); + + // Hydrate with no prior state + CbObject HydrationState; { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + CHECK_FALSE(HydrationState); } - CleanDirectory(ServerStateDir, true); +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + CHECK(std::filesystem::is_empty(ServerStateDir)); + // Hydrate: restore from S3 { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + } +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate one more time to confirm second dehydrate produced valid state + { + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); + } + + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } + + // Hydrate one more time to confirm second dehydrate produced valid state + { + Hydrator->Configure(Config); + HydrationState = Hydrator->Hydrate(); } +# ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + { + Hydrator->Configure(Config); + Hydrator->Dehydrate(HydrationState); + } +} + +TEST_CASE("hydration.create_hydrator_rejects_invalid_config") +{ + ScopedTemporaryDirectory TempDir; + + HydrationConfig Config; + Config.ServerStateDir = TempDir.Path() / "state"; + Config.TempDir = TempDir.Path() / "temp"; + Config.ModuleId = "invalid_test"; + + // Unknown TargetSpecification prefix + Config.TargetSpecification = "ftp://somewhere"; + CHECK_THROWS(CreateHydrator(Config)); + + // Unknown Options type + Config.TargetSpecification.clear(); + { + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } + CHECK_THROWS(CreateHydrator(Config)); + + // Empty Options (no type field) + Config.Options = CbObject(); + CHECK_THROWS(CreateHydrator(Config)); } TEST_SUITE_END(); diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index 19a96c248..7edf5d996 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -5,9 +5,12 @@ #include <zencore/compactbinary.h> #include <filesystem> +#include <optional> namespace zen { +class WorkerThreadPool; + struct HydrationConfig { // Location of server state to hydrate/dehydrate @@ -20,6 +23,16 @@ struct HydrationConfig std::string TargetSpecification; // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification) CbObject Options; + + struct ThreadingOptions + { + WorkerThreadPool* WorkerPool; + std::atomic<bool>* AbortFlag; + std::atomic<bool>* PauseFlag; + }; + + // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread. + std::optional<ThreadingOptions> Threading; }; /** @@ -34,11 +47,19 @@ struct HydrationStrategyBase { virtual ~HydrationStrategyBase() = default; - virtual void Dehydrate() = 0; - virtual void Hydrate() = 0; + // Set up the hydration target from Config. Must be called before Hydrate/Dehydrate. virtual void Configure(const HydrationConfig& Config) = 0; + + // Upload server state to the configured target. ServerStateDir is wiped on success. + // On failure, ServerStateDir is left intact. + virtual void Dehydrate(const CbObject& CachedState) = 0; + + // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate. + // On failure, ServerStateDir is wiped and an empty CbObject is returned. + virtual CbObject Hydrate() = 0; }; +// Create a configured hydrator based on Config. Ready to call Hydrate/Dehydrate immediately. std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config); #if ZEN_WITH_TESTS diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 0c9354990..6185a7f19 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -154,29 +154,43 @@ StorageServerInstance::WakeLocked() void StorageServerInstance::Hydrate() { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification, .Options = m_Config.HydrationOptions}; + if (m_Config.OptionalWorkerPool) + { + Config.Threading.emplace( + HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); + } std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - - Hydrator->Hydrate(); + m_HydrationState = Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification, .Options = m_Config.HydrationOptions}; + if (m_Config.OptionalWorkerPool) + { + Config.Threading.emplace( + HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); + } std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - - Hydrator->Dehydrate(); + Hydrator->Dehydrate(m_HydrationState); } StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 1b0078d87..d8bebc48d 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -10,6 +10,8 @@ namespace zen { +class WorkerThreadPool; + /** * Storage Server Instance * @@ -29,6 +31,8 @@ public: uint32_t HttpThreadCount = 0; // Automatic int CoreLimit = 0; // Automatic std::filesystem::path ConfigPath; + + WorkerThreadPool* OptionalWorkerPool = nullptr; }; StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId); @@ -135,6 +139,8 @@ private: std::filesystem::path m_TempDir; + CbObject m_HydrationState; + #if ZEN_PLATFORM_WINDOWS JobObject* m_JobObject = nullptr; #endif diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index d01e5f3f2..b94e04092 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -14,16 +14,17 @@ #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/intmath.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> #include <zencore/memory/tagtrace.h> #include <zencore/scopeguard.h> #include <zencore/sentryintegration.h> #include <zencore/system.h> +#include <zencore/thread.h> #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> -#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cxxopts.hpp> @@ -152,6 +153,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HubInstanceConfigPath), "<instance config>"); + const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option("hub", + "", + "hub-instance-provision-threads", + fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount) + ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)), + "<threads>"); + Options.add_option("hub", "", "hub-hydration-target-spec", @@ -168,6 +179,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HydrationTargetConfigPath), "<path>"); + const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option( + "hub", + "", + "hub-hydration-threads", + fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)), + "<threads>"); + #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", @@ -299,9 +320,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) Options.AddOption("hub.instance.limits.memorylimitpercent"sv, m_ServerOptions.HubProvisionMemoryLimitPercent, "hub-provision-memory-limit-percent"sv); + Options.AddOption("hub.instance.provisionthreads"sv, + m_ServerOptions.HubInstanceProvisionThreadCount, + "hub-instance-provision-threads"sv); Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); + Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, @@ -468,6 +493,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: // the main test range. ZenServerEnvironment::SetBaseChildId(1000); + m_ProvisionWorkerPool = + std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision"); + m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration"); + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); @@ -591,7 +620,9 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), }, - .ResourceLimits = ResolveLimits(ServerConfig)}; + .ResourceLimits = ResolveLimits(ServerConfig), + .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(), + .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()}; if (!ServerConfig.HydrationTargetConfigPath.empty()) { @@ -624,7 +655,6 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), - &GetMediumWorkerPool(EWorkloadType::Background), Hub::AsyncModuleStateChangeCallbackFunc{ [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index d1add7690..22791a648 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -6,6 +6,7 @@ #include "resourcemetrics.h" #include "zenserver.h" +#include <zencore/workthreadpool.h> #include <zenutil/consul.h> namespace cxxopts { @@ -40,16 +41,18 @@ struct ZenHubServerConfig : public ZenServerConfig std::string InstanceId; // For use in notifications std::string ConsulEndpoint; // If set, enables Consul service registration std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty - uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks - uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service - uint16_t HubBasePortNumber = 21000; - int HubInstanceLimit = 1000; - bool HubUseJobObject = true; - std::string HubInstanceHttpClass = "asio"; - uint32_t HubInstanceHttpThreadCount = 0; // Automatic - int HubInstanceCoreLimit = 0; // Automatic - std::filesystem::path HubInstanceConfigPath; // Path to Lua config file - std::string HydrationTargetSpecification; // hydration/dehydration target specification + uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks + uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service + uint16_t HubBasePortNumber = 21000; + int HubInstanceLimit = 1000; + bool HubUseJobObject = true; + std::string HubInstanceHttpClass = "asio"; + uint32_t HubInstanceHttpThreadCount = 0; // Automatic + uint32_t HubInstanceProvisionThreadCount = 0; // Synchronous provisioning + uint32_t HubHydrationThreadCount = 0; // Synchronous hydration/dehydration + int HubInstanceCoreLimit = 0; // Automatic + std::filesystem::path HubInstanceConfigPath; // Path to Lua config file + std::string HydrationTargetSpecification; // hydration/dehydration target specification std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification) ZenHubWatchdogConfig WatchdogConfig; uint64_t HubProvisionDiskLimitBytes = 0; @@ -123,6 +126,8 @@ private: bool m_DebugOptionForcedCrash = false; std::unique_ptr<HttpProxyHandler> m_Proxy; + std::unique_ptr<WorkerThreadPool> m_ProvisionWorkerPool; + std::unique_ptr<WorkerThreadPool> m_HydrationWorkerPool; std::unique_ptr<Hub> m_Hub; std::unique_ptr<HttpHubService> m_HubService; |