aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp515
1 files changed, 358 insertions, 157 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index b4e9de2f0..2afee8729 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -21,6 +21,10 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <zencore/thread.h>
+
+#include <thread>
+
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
@@ -30,8 +34,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-///////////////////////////////////////////////////////////////////////////
-
/**
* A timeline of events with sequence IDs and timestamps. Used to
* track significant events for broadcasting to listeners.
@@ -81,7 +83,6 @@ public:
*/
void IterateEventsSince(auto&& Callback, uint64_t SinceEventId)
{
- // Hold the lock for as short a time as possible
eastl::fixed_vector<EventRecord, 128> EventsToProcess;
m_Lock.WithSharedLock([&] {
for (auto& Event : m_Events)
@@ -93,7 +94,6 @@ public:
}
});
- // Now invoke the callback outside the lock
for (auto& Event : EventsToProcess)
{
Callback(Event);
@@ -176,16 +176,22 @@ Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace)
Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
-, m_WorkerPool(Config.OptionalProvisionWorkerPool)
+, m_ProvisionPool(Config.OptionalProvisionPool)
+, m_SpawnPool(Config.OptionalSpawnPool)
, m_InstanceClientShare(std::make_unique<HttpClientShare>())
, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
, m_ActiveInstances(Config.InstanceLimit)
, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
- ZEN_ASSERT_FORMAT(
- Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
- "Provision and hydration worker pools must be distinct to avoid deadlocks");
+ ZEN_ASSERT_FORMAT((Config.OptionalProvisionPool == nullptr) == (Config.OptionalSpawnPool == nullptr),
+ "Provision and spawn worker pools must both be set or both be null");
+ ZEN_ASSERT_FORMAT(Config.OptionalProvisionPool != Config.OptionalSpawnPool || Config.OptionalProvisionPool == nullptr,
+ "Provision and spawn worker pools must be distinct to avoid deadlocks");
+ ZEN_ASSERT_FORMAT(Config.OptionalProvisionPool != Config.OptionalHydrationPool || Config.OptionalProvisionPool == nullptr,
+ "Provision and hydration worker pools must be distinct to avoid deadlocks");
+ ZEN_ASSERT_FORMAT(Config.OptionalSpawnPool != Config.OptionalHydrationPool || Config.OptionalSpawnPool == nullptr,
+ "Spawn and hydration worker pools must be distinct to avoid deadlocks");
HydrationBase::Configuration HydrationConfig;
if (!m_Config.HydrationTargetSpecification.empty())
@@ -202,6 +208,11 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy
{
HydrationConfig.Options = m_Config.HydrationOptions;
}
+ if (Config.HydrationAsyncEnabled)
+ {
+ HydrationConfig.AsyncEnabled = true;
+ HydrationConfig.AsyncMaxConcurrentRequests = Config.HydrationAsyncMaxConcurrentRequests;
+ }
m_Hydration = InitHydration(HydrationConfig);
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
@@ -272,7 +283,7 @@ Hub::Shutdown()
m_WatchDog = {};
- if (WaitForBackgroundWork && m_WorkerPool)
+ if (WaitForBackgroundWork && (m_ProvisionPool != nullptr || m_SpawnPool != nullptr))
{
m_BackgroundWorkLatch.CountDown();
m_BackgroundWorkLatch.Wait();
@@ -300,7 +311,7 @@ Hub::Shutdown()
}
});
- if (WaitForBackgroundWork && m_WorkerPool)
+ if (WaitForBackgroundWork && (m_ProvisionPool != nullptr || m_SpawnPool != nullptr))
{
m_BackgroundWorkLatch.CountDown();
m_BackgroundWorkLatch.Wait();
@@ -317,7 +328,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
size_t ActiveInstanceIndex = (size_t)-1;
HubInstanceState OldState = HubInstanceState::Unprovisioned;
{
- RwLock::ExclusiveLockScope _(m_Lock);
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
@@ -339,23 +350,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
- *m_Hydration,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
- .TempDir = m_HydrationTempPath / ModuleId,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath,
- .Malloc = m_Config.InstanceMalloc,
- .Trace = m_Config.InstanceTrace,
- .TraceHost = m_Config.InstanceTraceHost,
- .TraceFile = m_Config.InstanceTraceFile,
- .EnableHydration = m_Config.EnableHydration,
- .EnableDehydration = m_Config.EnableDehydration,
- .HydrationPackEnabled = m_Config.HydrationPackEnabled,
- .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes,
- .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes,
- .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -367,7 +370,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
Instance = NewInstance->LockExclusive(/*Wait*/ true);
- m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
+ m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
+ m_ActiveInstances[ActiveInstanceIndex].HydrationState = {};
m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset();
m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
// Set Provisioning while both hub lock and instance lock are held so that any
@@ -378,7 +382,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
Instance = {};
m_ActiveInstances[ActiveInstanceIndex].Instance.reset();
- m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
m_InstanceLookup.erase(std::string(ModuleId));
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
throw;
@@ -419,7 +423,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
return Response{EResponseCode::Completed};
case HubInstanceState::Hibernated:
- _.ReleaseNow();
+ HubLock.ReleaseNow();
return Wake(std::string(ModuleId));
default:
return Response{EResponseCode::Rejected,
@@ -463,54 +467,54 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {});
- if (m_WorkerPool)
+ const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr;
+ if (Async)
{
+ auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
+ m_ProvisionPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
ActiveInstanceIndex,
OldState,
IsNewInstance,
- Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ Port = OutInfo.Port,
+ Instance = SharedInstance]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ if (!RunProvisionPhase1(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port))
+ {
+ return;
+ }
+
+ m_BackgroundWorkLatch.AddCount(1);
try
{
- CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance);
+ m_SpawnPool->ScheduleWork(
+ [this, ModuleId, ActiveInstanceIndex, OldState, IsNewInstance, Port, Instance]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ RunProvisionPhase2(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
- catch (const std::exception& Ex)
+ catch (const std::exception& DispatchEx)
{
- ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what());
+ // Fallback: run Phase2 inline on the ProvisionPool worker. Couples pool
+ // lifetimes (this ProvisionPool slot now executes what should run on
+ // SpawnPool) but better than dropping the request.
+ ZEN_ERROR("Failed async dispatch of provision phase 2 for module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ RunProvisionPhase2(*Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
catch (const std::exception& DispatchEx)
{
- // Dispatch failed: undo latch increment and roll back state.
- ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what());
+ ZEN_ERROR("Failed async dispatch of provision phase 1 for module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
-
- // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning
- NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {});
-
- std::unique_ptr<StorageServerInstance> DestroyInstance;
- {
- RwLock::ExclusiveLockScope HubLock(m_Lock);
- ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
- ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
- if (IsNewInstance)
- {
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(std::string(ModuleId));
- }
- UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
- }
- DestroyInstance.reset();
-
+ RollbackFailedProvision(*SharedInstance, ActiveInstanceIndex, OldState, IsNewInstance, OutInfo.Port);
throw;
}
}
@@ -519,7 +523,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance);
}
- return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+ return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
@@ -529,38 +533,91 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
bool IsNewInstance)
{
ZEN_TRACE_CPU("Hub::CompleteProvision");
+ const uint16_t Port = Instance.GetBasePort();
+ if (RunProvisionPhase1(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port))
+ {
+ RunProvisionPhase2(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
+ }
+}
+
+bool
+Hub::RunProvisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance,
+ uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunProvisionPhase1");
const std::string ModuleId(Instance.GetModuleId());
- const uint16_t Port = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- if (m_ShutdownFlag.load() == false)
+ if (m_ShutdownFlag.load())
{
- try
- {
- switch (OldState)
- {
- case HubInstanceState::Crashed:
- case HubInstanceState::Unprovisioned:
- Instance.Provision();
- break;
- case HubInstanceState::Hibernated:
- ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning
- break;
- default:
- ZEN_ASSERT(false);
- }
- UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
- NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri);
- Instance = {};
- return;
- }
- catch (const std::exception& Ex)
+ RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
+ return false;
+ }
+
+ try
+ {
+ switch (OldState)
{
- ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
- // Instance will be notified and removed below.
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Unprovisioned:
+ HydrateInstance(ActiveInstanceIndex, ModuleId);
+ break;
+ case HubInstanceState::Hibernated:
+ ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning
+ break;
+ default:
+ ZEN_ASSERT(false);
}
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to hydrate storage server instance for module '{}': {}", ModuleId, Ex.what());
+ RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
+ return false;
+ }
+}
+
+void
+Hub::RunProvisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance,
+ uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunProvisionPhase2");
+ const std::string ModuleId(Instance.GetModuleId());
+
+ if (m_ShutdownFlag.load())
+ {
+ RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
+ return;
+ }
+
+ try
+ {
+ Instance.Provision();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, {});
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ RollbackFailedProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance, Port);
}
+}
+void
+Hub::RollbackFailedProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance,
+ uint16_t Port)
+{
+ const std::string ModuleId(Instance.GetModuleId());
if (IsNewInstance)
{
NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {});
@@ -568,11 +625,11 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
std::unique_ptr<StorageServerInstance> DestroyInstance;
{
RwLock::ExclusiveLockScope HubLock(m_Lock);
- ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
- ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(ModuleId) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(ModuleId)->second == ActiveInstanceIndex);
DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(std::string(ModuleId));
+ m_InstanceLookup.erase(ModuleId);
UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
}
DestroyInstance.reset();
@@ -652,11 +709,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
}
}
- // NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // The exclusive instance lock acquired above prevents concurrent LockExclusive callers
- // from modifying instance state. The state transition to Deprovisioning happens below,
- // after the hub lock is released.
-
+ // Outside hub lock: exclusive instance lock above blocks concurrent state mutation. See Provision for the locking argument.
ZEN_ASSERT(Instance);
ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
@@ -664,32 +717,55 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
const uint16_t Port = Instance.GetBasePort();
NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {});
- if (m_WorkerPool)
+ const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr;
+ if (Async)
{
- std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
- std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
-
+ auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
- [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable {
+ m_SpawnPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, Port, Instance = SharedInstance]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState);
+ RunDeprovisionPhase1(*Instance, ActiveInstanceIndex, OldState, Port);
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what());
+ // Phase1 transitions the instance to Crashed before rethrowing,
+ // so the watchdog will pick this up via AttemptRecoverInstance.
+ // The deprovision request silently morphs into a recovery cycle;
+ // caller already saw EResponseCode::Accepted.
+ ZEN_ERROR("Failed async deprovision phase 1 of module '{}': {}", ModuleId, Ex.what());
+ return;
+ }
+
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_ProvisionPool->ScheduleWork(
+ [this, ModuleId, ActiveInstanceIndex, Port, Instance]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ RunDeprovisionPhase2(*Instance, ActiveInstanceIndex, Port);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Fallback: run Phase2 inline on the SpawnPool worker. Couples pool
+ // lifetimes (this SpawnPool slot now executes what should run on
+ // ProvisionPool) but better than dropping the request.
+ ZEN_ERROR("Failed async dispatch of deprovision phase 2 for module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ RunDeprovisionPhase2(*Instance, ActiveInstanceIndex, Port);
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
catch (const std::exception& DispatchEx)
{
- // Dispatch failed: undo latch increment and roll back state.
- ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
+ ZEN_ERROR("Failed async dispatch of deprovision phase 1 for module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {});
@@ -708,7 +784,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
CompleteDeprovision(Instance, ActiveInstanceIndex, OldState);
}
- return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+ return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed};
}
Hub::Response
@@ -766,12 +842,12 @@ Hub::Obliterate(const std::string& ModuleId)
m_ObliteratingInstances.insert(ModuleId);
Lock.ReleaseNow();
- if (m_WorkerPool)
+ if (m_ProvisionPool != nullptr)
{
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
+ m_ProvisionPool->ScheduleWork(
[this, ModuleId = std::string(ModuleId)]() {
auto Guard = MakeGuard([this, ModuleId]() {
m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); });
@@ -817,31 +893,51 @@ Hub::Obliterate(const std::string& ModuleId)
const uint16_t Port = Instance.GetBasePort();
NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {});
- if (m_WorkerPool)
+ const bool Async = m_ProvisionPool != nullptr && m_SpawnPool != nullptr;
+ if (Async)
{
- std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
- std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
-
+ auto SharedInstance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
- [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
+ m_SpawnPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Port, Instance = SharedInstance]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteObliterate(*Instance, ActiveInstanceIndex);
+ RunObliteratePhase1(*Instance, ActiveInstanceIndex, Port);
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what());
+ ZEN_ERROR("Failed async obliterate phase 1 of module '{}': {}", ModuleId, Ex.what());
+ return;
+ }
+
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_ProvisionPool->ScheduleWork(
+ [this, ModuleId, ActiveInstanceIndex, Port, Instance]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ RunObliteratePhase2(*Instance, ActiveInstanceIndex, Port);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Fallback: run Phase2 inline on the SpawnPool worker. Couples pool
+ // lifetimes (this SpawnPool slot now executes what should run on
+ // ProvisionPool) but better than dropping the request.
+ ZEN_ERROR("Failed async dispatch of obliterate phase 2 for module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ RunObliteratePhase2(*Instance, ActiveInstanceIndex, Port);
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
catch (const std::exception& DispatchEx)
{
- ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what());
+ ZEN_ERROR("Failed async dispatch obliterate phase 1 of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {});
@@ -860,15 +956,23 @@ Hub::Obliterate(const std::string& ModuleId)
CompleteObliterate(Instance, ActiveInstanceIndex);
}
- return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+ return Response{Async ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
{
ZEN_TRACE_CPU("Hub::CompleteObliterate");
+ const uint16_t Port = Instance.GetBasePort();
+ RunObliteratePhase1(Instance, ActiveInstanceIndex, Port);
+ RunObliteratePhase2(Instance, ActiveInstanceIndex, Port);
+}
+
+void
+Hub::RunObliteratePhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunObliteratePhase1");
const std::string ModuleId(Instance.GetModuleId());
- const uint16_t Port = Instance.GetBasePort();
try
{
@@ -876,15 +980,35 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz
}
catch (const std::exception& Ex)
{
+ // Best-effort cleanup: drop tracking and mark Unprovisioned. Transitioning to
+ // Crashed would let the watchdog re-provision a module the operator wanted gone.
ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what());
- Instance = {};
- {
- RwLock::ExclusiveLockScope HubLock(m_Lock);
- UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed);
- }
- NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {});
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {});
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
throw;
}
+}
+
+void
+Hub::RunObliteratePhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunObliteratePhase2");
+ const std::string ModuleId(Instance.GetModuleId());
+
+ try
+ {
+ ObliterateBackendData(ModuleId);
+ }
+ catch (const std::exception& Ex)
+ {
+ // Backend delete failed - documented leak path (see hydration.cpp Obliterate
+ // retry-then-fail). Drop local tracking and mark Unprovisioned; the watchdog
+ // must not re-provision a module the operator wanted gone.
+ ZEN_ERROR("Failed to obliterate backend data for module '{}': {}", ModuleId, Ex.what());
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {});
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
+ return;
+ }
NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {});
RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
@@ -894,8 +1018,19 @@ void
Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
ZEN_TRACE_CPU("Hub::CompleteDeprovision");
+ const uint16_t Port = Instance.GetBasePort();
+ RunDeprovisionPhase1(Instance, ActiveInstanceIndex, OldState, Port);
+ RunDeprovisionPhase2(Instance, ActiveInstanceIndex, Port);
+}
+
+void
+Hub::RunDeprovisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunDeprovisionPhase1");
const std::string ModuleId(Instance.GetModuleId());
- const uint16_t Port = Instance.GetBasePort();
try
{
@@ -942,9 +1077,8 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what());
- // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed
- // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed
- // so the watchdog can attempt recovery.
+ // GcClient HTTP calls and Instance.Deprovision can throw on transport failure
+ // or watchdog races; transition to Crashed so the watchdog can attempt recovery.
Instance = {};
{
RwLock::ExclusiveLockScope HubLock(m_Lock);
@@ -953,6 +1087,22 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {});
throw;
}
+}
+
+void
+Hub::RunDeprovisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port)
+{
+ ZEN_TRACE_CPU("Hub::RunDeprovisionPhase2");
+ const std::string ModuleId(Instance.GetModuleId());
+
+ try
+ {
+ DehydrateInstance(ActiveInstanceIndex, ModuleId);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Dehydration of module {} failed during deprovisioning, current state not saved. Reason: {}", ModuleId, Ex.what());
+ }
NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {});
RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
@@ -1012,11 +1162,7 @@ Hub::Hibernate(const std::string& ModuleId)
}
}
- // NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on
- // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
- // will have already changed the state and the re-validate above will reject it.
-
+ // Outside hub lock: re-validate after re-locking in worker rejects races. See Provision for the locking argument.
ZEN_ASSERT(Instance);
ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
@@ -1024,12 +1170,12 @@ Hub::Hibernate(const std::string& ModuleId)
const uint16_t Port = Instance.GetBasePort();
NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {});
- if (m_WorkerPool)
+ if (m_SpawnPool != nullptr)
{
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
+ m_SpawnPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
ActiveInstanceIndex,
@@ -1069,7 +1215,7 @@ Hub::Hibernate(const std::string& ModuleId)
CompleteHibernate(Instance, ActiveInstanceIndex, OldState);
}
- return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+ return Response{m_SpawnPool != nullptr ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
@@ -1148,11 +1294,7 @@ Hub::Wake(const std::string& ModuleId)
}
}
- // NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on
- // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
- // will have already changed the state and the re-validate above will reject it.
-
+ // Outside hub lock: re-validate after re-locking in worker rejects races. See Provision for the locking argument.
ZEN_ASSERT(Instance);
ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
@@ -1160,12 +1302,12 @@ Hub::Wake(const std::string& ModuleId)
const uint16_t Port = Instance.GetBasePort();
NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {});
- if (m_WorkerPool)
+ if (m_SpawnPool != nullptr)
{
m_BackgroundWorkLatch.AddCount(1);
try
{
- m_WorkerPool->ScheduleWork(
+ m_SpawnPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
ActiveInstanceIndex,
@@ -1205,7 +1347,7 @@ Hub::Wake(const std::string& ModuleId)
CompleteWake(Instance, ActiveInstanceIndex, OldState);
}
- return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+ return Response{m_SpawnPool != nullptr ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
@@ -1243,7 +1385,8 @@ Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t
auto It = m_InstanceLookup.find(std::string(ModuleId));
ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex);
- DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_ActiveInstances[ActiveInstanceIndex].HydrationState = {};
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
m_InstanceLookup.erase(It);
UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
@@ -1251,23 +1394,64 @@ Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t
DeleteInstance.reset();
}
+HydrationConfig
+Hub::MakeHydrationConfigForModule(std::string_view ModuleId, std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) const
+{
+ HydrationConfig Config{.ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId,
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .ModuleId = std::string(ModuleId)};
+ if (m_Config.OptionalHydrationPool)
+ {
+ Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationPool,
+ .AbortFlag = &AbortFlag,
+ .PauseFlag = &PauseFlag});
+ }
+ Config.PackEnabled = m_Config.HydrationPackEnabled;
+ Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes;
+ Config.MaxPackBytes = m_Config.HydrationMaxPackBytes;
+ return Config;
+}
+
void
-Hub::ObliterateBackendData(std::string_view ModuleId)
+Hub::HydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId)
{
- std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId;
- std::filesystem::path TempDir = m_HydrationTempPath / ModuleId;
+ if (!m_Config.EnableHydration)
+ {
+ ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", ModuleId);
+ return;
+ }
+ ZEN_TRACE_CPU("Hub::HydrateInstance");
- std::atomic<bool> AbortFlag{false};
- std::atomic<bool> PauseFlag{false};
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
+ m_ActiveInstances[ActiveInstanceIndex].HydrationState = Hydrator->Hydrate();
+}
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)};
- if (m_Config.OptionalHydrationWorkerPool)
+void
+Hub::DehydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId)
+{
+ if (!m_Config.EnableDehydration)
{
- Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool,
- .AbortFlag = &AbortFlag,
- .PauseFlag = &PauseFlag});
+ ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", ModuleId);
+ return;
}
+ ZEN_TRACE_CPU("Hub::DehydrateInstance");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
+ Hydrator->Dehydrate(m_ActiveInstances[ActiveInstanceIndex].HydrationState);
+}
+
+void
+Hub::ObliterateBackendData(std::string_view ModuleId)
+{
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfigForModule(ModuleId, AbortFlag, PauseFlag);
std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
Hydrator->Obliterate();
}
@@ -1478,6 +1662,16 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
try
{
Instance.Deprovision();
+ try
+ {
+ DehydrateInstance(ActiveInstanceIndex, ModuleId);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Dehydration of module {} failed during crash recovery cleanup, current state not saved. Reason: {}",
+ ModuleId,
+ Ex.what());
+ }
}
catch (const std::exception& Ex)
{
@@ -1502,6 +1696,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
try
{
+ HydrateInstance(ActiveInstanceIndex, ModuleId);
Instance.Provision();
UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {});
@@ -1795,7 +1990,6 @@ Hub::WatchDog()
}
catch (const std::exception& Ex)
{
- // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc
ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what());
}
}
@@ -1835,9 +2029,15 @@ namespace hub_testutils {
struct TestHubPools
{
WorkerThreadPool ProvisionPool;
+ WorkerThreadPool SpawnPool;
WorkerThreadPool HydrationPool;
- explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {}
+ explicit TestHubPools(int ThreadCount)
+ : ProvisionPool(ThreadCount, "hub_test_provision")
+ , SpawnPool(ThreadCount, "hub_test_spawn")
+ , HydrationPool(ThreadCount, "hub_test_hydr")
+ {
+ }
};
ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
@@ -1852,8 +2052,9 @@ namespace hub_testutils {
{
if (Pools)
{
- Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool;
- Config.OptionalHydrationWorkerPool = &Pools->HydrationPool;
+ Config.OptionalProvisionPool = &Pools->ProvisionPool;
+ Config.OptionalSpawnPool = &Pools->SpawnPool;
+ Config.OptionalHydrationPool = &Pools->HydrationPool;
}
return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
}