diff options
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 515 |
1 files changed, 358 insertions, 157 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index b4e9de2f0..2afee8729 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -21,6 +21,10 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <zencore/thread.h> + +#include <thread> + #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> @@ -30,8 +34,6 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -/////////////////////////////////////////////////////////////////////////// - /** * A timeline of events with sequence IDs and timestamps. Used to * track significant events for broadcasting to listeners. @@ -81,7 +83,6 @@ public: */ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) { - // Hold the lock for as short a time as possible eastl::fixed_vector<EventRecord, 128> EventsToProcess; m_Lock.WithSharedLock([&] { for (auto& Event : m_Events) @@ -93,7 +94,6 @@ public: } }); - // Now invoke the callback outside the lock for (auto& Event : EventsToProcess) { Callback(Event); @@ -176,16 +176,22 @@ Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_WorkerPool(Config.OptionalProvisionWorkerPool) +, m_ProvisionPool(Config.OptionalProvisionPool) +, m_SpawnPool(Config.OptionalSpawnPool) , m_InstanceClientShare(std::make_unique<HttpClientShare>()) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { - ZEN_ASSERT_FORMAT( - Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, - "Provision and hydration worker pools must be distinct to avoid deadlocks"); + ZEN_ASSERT_FORMAT((Config.OptionalProvisionPool == nullptr) == (Config.OptionalSpawnPool == nullptr), + "Provision and spawn worker pools must both be set or both be null"); + ZEN_ASSERT_FORMAT(Config.OptionalProvisionPool != Config.OptionalSpawnPool || Config.OptionalProvisionPool == nullptr, + "Provision and spawn worker pools must be distinct to avoid deadlocks"); + ZEN_ASSERT_FORMAT(Config.OptionalProvisionPool != Config.OptionalHydrationPool || Config.OptionalProvisionPool == nullptr, + "Provision and hydration worker pools must be distinct to avoid deadlocks"); + ZEN_ASSERT_FORMAT(Config.OptionalSpawnPool != Config.OptionalHydrationPool || Config.OptionalSpawnPool == nullptr, + "Spawn and hydration worker pools must be distinct to avoid deadlocks"); HydrationBase::Configuration HydrationConfig; if (!m_Config.HydrationTargetSpecification.empty()) @@ -202,6 +208,11 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy { HydrationConfig.Options = m_Config.HydrationOptions; } + if (Config.HydrationAsyncEnabled) + { + HydrationConfig.AsyncEnabled = true; + HydrationConfig.AsyncMaxConcurrentRequests = Config.HydrationAsyncMaxConcurrentRequests; + } m_Hydration = InitHydration(HydrationConfig); m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); @@ -272,7 +283,7 @@ Hub::Shutdown() m_WatchDog = {}; - if (WaitForBackgroundWork && m_WorkerPool) + if (WaitForBackgroundWork && (m_ProvisionPool != nullptr || m_SpawnPool != nullptr)) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); @@ -300,7 +311,7 @@ Hub::Shutdown() } }); - if (WaitForBackgroundWork && m_WorkerPool) + if (WaitForBackgroundWork && (m_ProvisionPool != nullptr || m_SpawnPool != nullptr)) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); @@ -317,7 +328,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) size_t ActiveInstanceIndex = (size_t)-1; HubInstanceState OldState = HubInstanceState::Unprovisioned; { - RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope HubLock(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { @@ -339,23 +350,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, - *m_Hydration, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), - .TempDir = m_HydrationTempPath / ModuleId, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath, - .Malloc = m_Config.InstanceMalloc, - .Trace = m_Config.InstanceTrace, - .TraceHost = m_Config.InstanceTraceHost, - .TraceFile = m_Config.InstanceTraceFile, - .EnableHydration = m_Config.EnableHydration, - .EnableDehydration = m_Config.EnableDehydration, - .HydrationPackEnabled = m_Config.HydrationPackEnabled, - .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes, - .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes, - .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -367,7 +370,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) Instance = NewInstance->LockExclusive(/*Wait*/ true); - m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].HydrationState = {}; m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset(); m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); // Set Provisioning while both hub lock and instance lock are held so that any @@ -378,7 +382,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { Instance = {}; m_ActiveInstances[ActiveInstanceIndex].Instance.reset(); - m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); m_InstanceLookup.erase(std::string(ModuleId)); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); throw; @@ -419,7 +423,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return Response{EResponseCode::Completed}; case HubInstanceState::Hibernated: - _.ReleaseNow(); + HubLock.ReleaseNow(); return Wake(std::string(ModuleId)); default: return Response{EResponseCode::Rejected, @@ -463,54 +467,54 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {}); - if (m_WorkerPool) + const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr; + if (Async) { + auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( + m_ProvisionPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, IsNewInstance, - Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + Port = OutInfo.Port, + Instance = SharedInstance]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + if (!RunProvisionPhase1(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port)) + { + return; + } + + m_BackgroundWorkLatch.AddCount(1); try { - CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance); + m_SpawnPool->ScheduleWork( + [this, ModuleId, ActiveInstanceIndex, OldState, IsNewInstance, Port, Instance]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + RunProvisionPhase2(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); + }, + WorkerThreadPool::EMode::EnableBacklog); } - catch (const std::exception& Ex) + catch (const std::exception& DispatchEx) { - ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); + // Fallback: run Phase2 inline on the ProvisionPool worker. Couples pool + // lifetimes (this ProvisionPool slot now executes what should run on + // SpawnPool) but better than dropping the request. + ZEN_ERROR("Failed async dispatch of provision phase 2 for module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + RunProvisionPhase2(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo latch increment and roll back state. - ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); + ZEN_ERROR("Failed async dispatch of provision phase 1 for module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - - // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning - NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {}); - - std::unique_ptr<StorageServerInstance> DestroyInstance; - { - RwLock::ExclusiveLockScope HubLock(m_Lock); - ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); - ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); - if (IsNewInstance) - { - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(std::string(ModuleId)); - } - UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); - } - DestroyInstance.reset(); - + RollbackFailedProvision(*SharedInstance, ActiveInstanceIndex, OldState, IsNewInstance, OutInfo.Port); throw; } } @@ -519,7 +523,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance); } - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; + return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed}; } void @@ -529,38 +533,91 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, bool IsNewInstance) { ZEN_TRACE_CPU("Hub::CompleteProvision"); + const uint16_t Port = Instance.GetBasePort(); + if (RunProvisionPhase1(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port)) + { + RunProvisionPhase2(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); + } +} + +bool +Hub::RunProvisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance, + uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunProvisionPhase1"); const std::string ModuleId(Instance.GetModuleId()); - const uint16_t Port = Instance.GetBasePort(); - std::string BaseUri; // TODO? - if (m_ShutdownFlag.load() == false) + if (m_ShutdownFlag.load()) { - try - { - switch (OldState) - { - case HubInstanceState::Crashed: - case HubInstanceState::Unprovisioned: - Instance.Provision(); - break; - case HubInstanceState::Hibernated: - ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning - break; - default: - ZEN_ASSERT(false); - } - UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); - NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri); - Instance = {}; - return; - } - catch (const std::exception& Ex) + RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); + return false; + } + + try + { + switch (OldState) { - ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - // Instance will be notified and removed below. + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + HydrateInstance(ActiveInstanceIndex, ModuleId); + break; + case HubInstanceState::Hibernated: + ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning + break; + default: + ZEN_ASSERT(false); } + return true; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hydrate storage server instance for module '{}': {}", ModuleId, Ex.what()); + RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); + return false; + } +} + +void +Hub::RunProvisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance, + uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunProvisionPhase2"); + const std::string ModuleId(Instance.GetModuleId()); + + if (m_ShutdownFlag.load()) + { + RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); + return; + } + + try + { + Instance.Provision(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, {}); + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port); } +} +void +Hub::RollbackFailedProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance, + uint16_t Port) +{ + const std::string ModuleId(Instance.GetModuleId()); if (IsNewInstance) { NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {}); @@ -568,11 +625,11 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, std::unique_ptr<StorageServerInstance> DestroyInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); - ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); - ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(ModuleId) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(ModuleId)->second == ActiveInstanceIndex); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(std::string(ModuleId)); + m_InstanceLookup.erase(ModuleId); UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } DestroyInstance.reset(); @@ -652,11 +709,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI } } - // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // The exclusive instance lock acquired above prevents concurrent LockExclusive callers - // from modifying instance state. The state transition to Deprovisioning happens below, - // after the hub lock is released. - + // Outside hub lock: exclusive instance lock above blocks concurrent state mutation. See Provision for the locking argument. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); @@ -664,32 +717,55 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {}); - if (m_WorkerPool) + const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr; + if (Async) { - std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = - std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); - + auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( - [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable { + m_SpawnPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, Port, Instance = SharedInstance]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState); + RunDeprovisionPhase1(*Instance, ActiveInstanceIndex, OldState, Port); } catch (const std::exception& Ex) { - ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); + // Phase1 transitions the instance to Crashed before rethrowing, + // so the watchdog will pick this up via AttemptRecoverInstance. + // The deprovision request silently morphs into a recovery cycle; + // caller already saw EResponseCode::Accepted. + ZEN_ERROR("Failed async deprovision phase 1 of module '{}': {}", ModuleId, Ex.what()); + return; + } + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_ProvisionPool->ScheduleWork( + [this, ModuleId, ActiveInstanceIndex, Port, Instance]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + RunDeprovisionPhase2(*Instance, ActiveInstanceIndex, Port); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Fallback: run Phase2 inline on the SpawnPool worker. Couples pool + // lifetimes (this SpawnPool slot now executes what should run on + // ProvisionPool) but better than dropping the request. + ZEN_ERROR("Failed async dispatch of deprovision phase 2 for module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + RunDeprovisionPhase2(*Instance, ActiveInstanceIndex, Port); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo latch increment and roll back state. - ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); + ZEN_ERROR("Failed async dispatch of deprovision phase 1 for module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {}); @@ -708,7 +784,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI CompleteDeprovision(Instance, ActiveInstanceIndex, OldState); } - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; + return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed}; } Hub::Response @@ -766,12 +842,12 @@ Hub::Obliterate(const std::string& ModuleId) m_ObliteratingInstances.insert(ModuleId); Lock.ReleaseNow(); - if (m_WorkerPool) + if (m_ProvisionPool != nullptr) { m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( + m_ProvisionPool->ScheduleWork( [this, ModuleId = std::string(ModuleId)]() { auto Guard = MakeGuard([this, ModuleId]() { m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); }); @@ -817,31 +893,51 @@ Hub::Obliterate(const std::string& ModuleId) const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {}); - if (m_WorkerPool) + const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr; + if (Async) { - std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = - std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); - + auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( - [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + m_SpawnPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Port, Instance = SharedInstance]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteObliterate(*Instance, ActiveInstanceIndex); + RunObliteratePhase1(*Instance, ActiveInstanceIndex, Port); } catch (const std::exception& Ex) { - ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what()); + ZEN_ERROR("Failed async obliterate phase 1 of module '{}': {}", ModuleId, Ex.what()); + return; + } + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_ProvisionPool->ScheduleWork( + [this, ModuleId, ActiveInstanceIndex, Port, Instance]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + RunObliteratePhase2(*Instance, ActiveInstanceIndex, Port); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Fallback: run Phase2 inline on the SpawnPool worker. Couples pool + // lifetimes (this SpawnPool slot now executes what should run on + // ProvisionPool) but better than dropping the request. + ZEN_ERROR("Failed async dispatch of obliterate phase 2 for module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + RunObliteratePhase2(*Instance, ActiveInstanceIndex, Port); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { - ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what()); + ZEN_ERROR("Failed async dispatch obliterate phase 1 of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {}); @@ -860,15 +956,23 @@ Hub::Obliterate(const std::string& ModuleId) CompleteObliterate(Instance, ActiveInstanceIndex); } - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; + return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { ZEN_TRACE_CPU("Hub::CompleteObliterate"); + const uint16_t Port = Instance.GetBasePort(); + RunObliteratePhase1(Instance, ActiveInstanceIndex, Port); + RunObliteratePhase2(Instance, ActiveInstanceIndex, Port); +} + +void +Hub::RunObliteratePhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunObliteratePhase1"); const std::string ModuleId(Instance.GetModuleId()); - const uint16_t Port = Instance.GetBasePort(); try { @@ -876,15 +980,35 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz } catch (const std::exception& Ex) { + // Best-effort cleanup: drop tracking and mark Unprovisioned. Transitioning to + // Crashed would let the watchdog re-provision a module the operator wanted gone. ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - { - RwLock::ExclusiveLockScope HubLock(m_Lock); - UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); - } - NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {}); + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); throw; } +} + +void +Hub::RunObliteratePhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunObliteratePhase2"); + const std::string ModuleId(Instance.GetModuleId()); + + try + { + ObliterateBackendData(ModuleId); + } + catch (const std::exception& Ex) + { + // Backend delete failed - documented leak path (see hydration.cpp Obliterate + // retry-then-fail). Drop local tracking and mark Unprovisioned; the watchdog + // must not re-provision a module the operator wanted gone. + ZEN_ERROR("Failed to obliterate backend data for module '{}': {}", ModuleId, Ex.what()); + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); + return; + } NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); @@ -894,8 +1018,19 @@ void Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { ZEN_TRACE_CPU("Hub::CompleteDeprovision"); + const uint16_t Port = Instance.GetBasePort(); + RunDeprovisionPhase1(Instance, ActiveInstanceIndex, OldState, Port); + RunDeprovisionPhase2(Instance, ActiveInstanceIndex, Port); +} + +void +Hub::RunDeprovisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunDeprovisionPhase1"); const std::string ModuleId(Instance.GetModuleId()); - const uint16_t Port = Instance.GetBasePort(); try { @@ -942,9 +1077,8 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); - // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed - // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed - // so the watchdog can attempt recovery. + // GcClient HTTP calls and Instance.Deprovision can throw on transport failure + // or watchdog races; transition to Crashed so the watchdog can attempt recovery. Instance = {}; { RwLock::ExclusiveLockScope HubLock(m_Lock); @@ -953,6 +1087,22 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {}); throw; } +} + +void +Hub::RunDeprovisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port) +{ + ZEN_TRACE_CPU("Hub::RunDeprovisionPhase2"); + const std::string ModuleId(Instance.GetModuleId()); + + try + { + DehydrateInstance(ActiveInstanceIndex, ModuleId); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Dehydration of module {} failed during deprovisioning, current state not saved. Reason: {}", ModuleId, Ex.what()); + } NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); @@ -1012,11 +1162,7 @@ Hub::Hibernate(const std::string& ModuleId) } } - // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on - // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below - // will have already changed the state and the re-validate above will reject it. - + // Outside hub lock: re-validate after re-locking in worker rejects races. See Provision for the locking argument. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); @@ -1024,12 +1170,12 @@ Hub::Hibernate(const std::string& ModuleId) const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {}); - if (m_WorkerPool) + if (m_SpawnPool != nullptr) { m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( + m_SpawnPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, @@ -1069,7 +1215,7 @@ Hub::Hibernate(const std::string& ModuleId) CompleteHibernate(Instance, ActiveInstanceIndex, OldState); } - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; + return Response{m_SpawnPool != nullptr ? EResponseCode::Accepted : EResponseCode::Completed}; } void @@ -1148,11 +1294,7 @@ Hub::Wake(const std::string& ModuleId) } } - // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on - // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below - // will have already changed the state and the re-validate above will reject it. - + // Outside hub lock: re-validate after re-locking in worker rejects races. See Provision for the locking argument. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); @@ -1160,12 +1302,12 @@ Hub::Wake(const std::string& ModuleId) const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {}); - if (m_WorkerPool) + if (m_SpawnPool != nullptr) { m_BackgroundWorkLatch.AddCount(1); try { - m_WorkerPool->ScheduleWork( + m_SpawnPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, @@ -1205,7 +1347,7 @@ Hub::Wake(const std::string& ModuleId) CompleteWake(Instance, ActiveInstanceIndex, OldState); } - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; + return Response{m_SpawnPool != nullptr ? EResponseCode::Accepted : EResponseCode::Completed}; } void @@ -1243,7 +1385,8 @@ Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t auto It = m_InstanceLookup.find(std::string(ModuleId)); ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); - DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_ActiveInstances[ActiveInstanceIndex].HydrationState = {}; m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); @@ -1251,23 +1394,64 @@ Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t DeleteInstance.reset(); } +HydrationConfig +Hub::MakeHydrationConfigForModule(std::string_view ModuleId, std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) const +{ + HydrationConfig Config{.ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId, + .TempDir = m_HydrationTempPath / ModuleId, + .ModuleId = std::string(ModuleId)}; + if (m_Config.OptionalHydrationPool) + { + Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationPool, + .AbortFlag = &AbortFlag, + .PauseFlag = &PauseFlag}); + } + Config.PackEnabled = m_Config.HydrationPackEnabled; + Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes; + Config.MaxPackBytes = m_Config.HydrationMaxPackBytes; + return Config; +} + void -Hub::ObliterateBackendData(std::string_view ModuleId) +Hub::HydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId) { - std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId; - std::filesystem::path TempDir = m_HydrationTempPath / ModuleId; + if (!m_Config.EnableHydration) + { + ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", ModuleId); + return; + } + ZEN_TRACE_CPU("Hub::HydrateInstance"); - std::atomic<bool> AbortFlag{false}; - std::atomic<bool> PauseFlag{false}; + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); + m_ActiveInstances[ActiveInstanceIndex].HydrationState = Hydrator->Hydrate(); +} - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)}; - if (m_Config.OptionalHydrationWorkerPool) +void +Hub::DehydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId) +{ + if (!m_Config.EnableDehydration) { - Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool, - .AbortFlag = &AbortFlag, - .PauseFlag = &PauseFlag}); + ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", ModuleId); + return; } + ZEN_TRACE_CPU("Hub::DehydrateInstance"); + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); + Hydrator->Dehydrate(m_ActiveInstances[ActiveInstanceIndex].HydrationState); +} + +void +Hub::ObliterateBackendData(std::string_view ModuleId) +{ + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag); std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); Hydrator->Obliterate(); } @@ -1478,6 +1662,16 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) try { Instance.Deprovision(); + try + { + DehydrateInstance(ActiveInstanceIndex, ModuleId); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Dehydration of module {} failed during crash recovery cleanup, current state not saved. Reason: {}", + ModuleId, + Ex.what()); + } } catch (const std::exception& Ex) { @@ -1502,6 +1696,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) try { + HydrateInstance(ActiveInstanceIndex, ModuleId); Instance.Provision(); UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {}); @@ -1795,7 +1990,6 @@ Hub::WatchDog() } catch (const std::exception& Ex) { - // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what()); } } @@ -1835,9 +2029,15 @@ namespace hub_testutils { struct TestHubPools { WorkerThreadPool ProvisionPool; + WorkerThreadPool SpawnPool; WorkerThreadPool HydrationPool; - explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {} + explicit TestHubPools(int ThreadCount) + : ProvisionPool(ThreadCount, "hub_test_provision") + , SpawnPool(ThreadCount, "hub_test_spawn") + , HydrationPool(ThreadCount, "hub_test_hydr") + { + } }; ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) @@ -1852,8 +2052,9 @@ namespace hub_testutils { { if (Pools) { - Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool; - Config.OptionalHydrationWorkerPool = &Pools->HydrationPool; + Config.OptionalProvisionPool = &Pools->ProvisionPool; + Config.OptionalSpawnPool = &Pools->SpawnPool; + Config.OptionalHydrationPool = &Pools->HydrationPool; } return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } |