aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/zenserver/hub/hub.cpp217
-rw-r--r--src/zenserver/hub/hub.h8
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp32
3 files changed, 105 insertions, 152 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 8c8a98322..0674fe7bb 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -157,9 +157,6 @@ Hub::Hub(const Configuration& Config,
m_InstanceLookup.reserve(Config.InstanceLimit);
std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0);
- m_FreePorts.resize(Config.InstanceLimit);
- std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber);
-
#if ZEN_PLATFORM_WINDOWS
if (m_Config.UseJobObject)
{
@@ -243,17 +240,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
bool IsNewInstance = false;
- uint16_t AllocatedPort = 0;
{
RwLock::ExclusiveLockScope _(m_Lock);
- auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() {
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
- {
- m_FreePorts.push_back(AllocatedPort);
- AllocatedPort = 0;
- }
- });
-
if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end())
{
// Same operation already in flight -- return the already-allocated port.
@@ -275,13 +263,13 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
IsNewInstance = true;
- AllocatedPort = m_FreePorts.front();
- ZEN_ASSERT(AllocatedPort != 0);
- m_FreePorts.pop_front();
+ size_t ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front();
+ m_FreeActiveInstanceIndexes.pop_front();
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = AllocatedPort,
+ StorageServerInstance::Configuration{.BasePort = gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex),
.HydrationTempPath = m_HydrationTempPath,
.HydrationTargetSpecification = m_HydrationTargetSpecification,
.HttpThreadCount = m_Config.InstanceHttpThreadCount,
@@ -298,11 +286,6 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
Instance = NewInstance->LockExclusive(/*Wait*/ true);
- size_t ActiveInstanceIndex = (size_t)-1;
- ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back();
- m_FreeActiveInstanceIndexes.pop_back();
- ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
-
m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
@@ -316,15 +299,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
}
else
{
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
-
if (m_RecoveringModules.contains(std::string(ModuleId)))
{
ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId);
return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
}
+ size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
+
std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(InstanceRaw);
@@ -334,11 +317,10 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
return Response{EResponseCode::Completed};
}
- Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- AllocatedPort = InstanceRaw->GetBasePort();
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
}
- m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort);
+ m_ProvisioningModules.emplace(std::string(ModuleId), Instance.GetBasePort());
}
// NOTE: done while not holding the hub lock, to avoid blocking other operations.
@@ -347,6 +329,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
ZEN_ASSERT(Instance);
+ OutInfo.Port = Instance.GetBasePort();
+
if (m_WorkerPool)
{
m_BackgroundWorkLatch.AddCount(1);
@@ -355,13 +339,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
m_WorkerPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
- AllocatedPort,
IsNewInstance,
Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteProvision(*Instance, AllocatedPort, IsNewInstance);
+ CompleteProvision(*Instance, IsNewInstance);
}
catch (const std::exception& Ex)
{
@@ -392,10 +375,6 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_ProvisioningModules.erase(std::string(ModuleId));
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
- {
- m_FreePorts.push_back(AllocatedPort);
- }
}
throw;
@@ -403,16 +382,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
}
else
{
- CompleteProvision(Instance, AllocatedPort, IsNewInstance);
+ CompleteProvision(Instance, IsNewInstance);
}
- OutInfo.Port = AllocatedPort;
-
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance)
+Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, bool IsNewInstance)
{
const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
@@ -424,10 +401,6 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint
auto RemoveProvisioningModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_ProvisioningModules.erase(std::string(ModuleId));
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
- {
- m_FreePorts.push_back(AllocatedPort);
- }
});
if (m_ShutdownFlag.load() == false)
@@ -435,9 +408,8 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint
try
{
(void)Instance.Provision(); // false = already in target state (idempotent); not an error
- NewState = Instance.GetState();
- AllocatedPort = 0;
- Instance = {};
+ NewState = Instance.GetState();
+ Instance = {};
}
catch (const std::exception& Ex)
{
@@ -498,7 +470,6 @@ Hub::Deprovision(const std::string& ModuleId)
Hub::Response
Hub::InternalDeprovision(const std::string& ModuleId)
{
- std::unique_ptr<StorageServerInstance> RawInstance;
StorageServerInstance::ExclusiveLockedPtr Instance;
{
@@ -532,11 +503,9 @@ Hub::InternalDeprovision(const std::string& ModuleId)
{
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(RawInstance != nullptr);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort());
Instance = RawInstance->LockExclusive(/*Wait*/ true);
@@ -547,24 +516,25 @@ Hub::InternalDeprovision(const std::string& ModuleId)
// m_DeprovisioningModules tracks which modules are being deprovisioned, blocking
// concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
- ZEN_ASSERT(RawInstance);
ZEN_ASSERT(Instance);
if (m_WorkerPool)
{
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ uint16_t BasePort = Instance.GetBasePort();
+
+ std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
+ std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
m_BackgroundWorkLatch.AddCount(1);
try
{
m_WorkerPool->ScheduleWork(
- [this,
- ModuleId = std::string(ModuleId),
- Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)),
- RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable {
+ [this, ModuleId = std::string(ModuleId), Instance = std::move(SharedInstancePtr)]() mutable {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
CompleteDeprovision(*Instance);
- RawInstance.reset();
}
catch (const std::exception& Ex)
{
@@ -578,19 +548,10 @@ Hub::InternalDeprovision(const std::string& ModuleId)
ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- uint16_t BasePort = Instance.GetBasePort();
InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {});
-
- // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
- Instance = {};
- NewState = HubInstanceState::Unprovisioned;
-
{
RwLock::ExclusiveLockScope _(m_Lock);
m_DeprovisioningModules.erase(std::string(ModuleId));
- m_FreePorts.push_back(BasePort);
}
throw;
}
@@ -598,7 +559,6 @@ Hub::InternalDeprovision(const std::string& ModuleId)
else
{
CompleteDeprovision(Instance);
- RawInstance.reset();
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
@@ -614,14 +574,6 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance)
HubInstanceState NewState = OldState;
InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
- auto _ = MakeGuard([&] {
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(std::string(ModuleId));
- m_FreePorts.push_back(BasePort);
- }
- });
-
try
{
(void)Instance.Deprovision();
@@ -631,11 +583,21 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance)
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what());
- // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
- NewState = HubInstanceState::Unprovisioned;
+ NewState = Instance.GetState();
Instance = {};
- throw;
}
+
+ std::unique_ptr<StorageServerInstance> DeleteInstance;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(std::string(ModuleId));
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT(It != m_InstanceLookup.end());
+ DeleteInstance = std::move(m_ActiveInstances[It->second].Instance);
+ m_FreeActiveInstanceIndexes.push_back(It->second);
+ m_InstanceLookup.erase(It);
+ }
+ DeleteInstance.reset();
}
Hub::Response
@@ -1021,16 +983,6 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return false;
}
- // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below
- // the instance count limit but with no free ports available
- if (m_FreePorts.empty())
- {
- OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})",
- m_Config.InstanceLimit - m_InstanceLookup.size());
-
- return false;
- }
-
// TODO: handle additional resource metrics
return true;
@@ -1074,6 +1026,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
return;
}
+ ZEN_ASSERT(!m_RecoveringModules.contains(std::string(ModuleId)));
m_RecoveringModules.emplace(std::string(ModuleId));
}
@@ -1085,59 +1038,37 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
HubInstanceState OldState = Instance.GetState();
HubInstanceState NewState = OldState;
- auto RemoveRecoveringModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_RecoveringModules.erase(std::string(ModuleId));
- });
-
- if (Instance.RecoverFromCrash())
- {
- // Spawn succeeded -- instance is back in Provisioned state.
- NewState = Instance.GetState();
- Instance = {};
- OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
- return;
- }
-
- // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down
- // so any salvageable data is preserved.
+ // Instance is still crashed. Dehydrate before tearing down so any salvageable data is preserved.
try
{
(void)Instance.Deprovision();
+ Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
+ Instance = {};
}
- Instance = {};
std::unique_ptr<StorageServerInstance> DestroyInstance;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
- {
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- }
- m_FreePorts.push_back(BasePort);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT(It != m_InstanceLookup.end());
+
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
m_RecoveringModules.erase(std::string(ModuleId));
}
- RemoveRecoveringModule.Dismiss();
- try
- {
- DestroyInstance.reset();
- NewState = HubInstanceState::Unprovisioned;
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what());
- }
+ DestroyInstance.reset();
- // Notify after all cleanup -- port is back in m_FreePorts and the callback sees
+ NewState = HubInstanceState::Unprovisioned;
+
+ // Notify after all cleanup
// a consistent end-state: module gone, transition complete.
OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
}
@@ -1288,6 +1219,42 @@ namespace hub_testutils {
}
};
+ // Poll until Find() returns false for the given module (i.e. async deprovision completes).
+ static bool WaitForInstanceGone(Hub& HubInstance,
+ std::string_view ModuleId,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (!HubInstance.Find(ModuleId))
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return !HubInstance.Find(ModuleId);
+ }
+
+ // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete).
+ static bool WaitForInstanceCount(Hub& HubInstance,
+ int ExpectedCount,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance.GetInstanceCount() == ExpectedCount)
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return HubInstance.GetInstanceCount() == ExpectedCount;
+ }
+
} // namespace hub_testutils
TEST_CASE("hub.provision_basic")
@@ -1844,12 +1811,12 @@ TEST_CASE("hub.async_hibernate_wake")
CHECK(ModClient.Get("/health/"));
}
- // Deprovision
+ // Deprovision asynchronously and poll until the instance is gone
{
const Hub::Response R = HubInstance->Deprovision("async_hib_a");
CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
}
- CHECK_FALSE(HubInstance->Find("async_hib_a"));
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout");
}
TEST_CASE("hub.recover_process_crash")
@@ -2018,7 +1985,7 @@ TEST_CASE("hub.async_provision_concurrent")
const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I));
CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message);
}
- CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout");
}
TEST_CASE("hub.async_provision_shutdown_waits")
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index c94612621..a3f85a8c5 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -176,15 +176,15 @@ private:
struct ActiveInstance
{
std::unique_ptr<StorageServerInstance> Instance;
- std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
+ // std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; // TODO: Move state from StorageServerInstance to
+ // here
};
std::vector<ActiveInstance> m_ActiveInstances;
- std::vector<size_t> m_FreeActiveInstanceIndexes;
+ std::deque<size_t> m_FreeActiveInstanceIndexes;
ResourceMetrics m_ResourceLimits;
SystemMetrics m_HostMetrics;
std::atomic<int> m_MaxInstanceCount = 0;
- std::deque<uint16_t> m_FreePorts;
std::thread m_WatchDog;
Event m_WatchDogEvent;
@@ -197,7 +197,7 @@ private:
bool IsModuleInFlightLocked(std::string_view ModuleId) const;
Response InternalDeprovision(const std::string& ModuleId);
- void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance);
+ void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, bool IsNewInstance);
void AbortProvision(std::string_view ModuleId);
void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance);
void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance);
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 99f0c29f3..4710aba1e 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -132,15 +132,8 @@ StorageServerInstance::DeprovisionLocked()
m_State = HubInstanceState::Deprovisioning;
if (CurrentState == HubInstanceState::Provisioned)
{
- try
- {
- m_ServerInstance.Shutdown();
- }
- catch (...)
- {
- m_State = HubInstanceState::Provisioned; // Shutdown failed; process may still be running
- throw;
- }
+ // m_ServerInstance.Shutdown() never throws unless there is a programming error (ZEN_ASSERT)
+ m_ServerInstance.Shutdown();
}
// Crashed or Hibernated: process already dead; skip Shutdown
@@ -148,10 +141,9 @@ StorageServerInstance::DeprovisionLocked()
{
Dehydrate();
}
- catch (...)
+ catch (const std::exception& Ex)
{
- m_State = HubInstanceState::Crashed; // Dehydrate failed; process is already dead
- throw;
+ ZEN_WARN("Dehydration of module {} failed, current state not saved. Reason: {}", m_ModuleId, Ex.what());
}
m_State = HubInstanceState::Unprovisioned;
@@ -178,17 +170,11 @@ StorageServerInstance::HibernateLocked()
}
m_State = HubInstanceState::Hibernating;
- try
- {
- m_ServerInstance.Shutdown();
- m_State = HubInstanceState::Hibernated;
- return true;
- }
- catch (...)
- {
- m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running
- throw;
- }
+
+ // m_ServerInstance.Shutdown() never throws unless there is a programming error (ZEN_ASSERT)
+ m_ServerInstance.Shutdown();
+ m_State = HubInstanceState::Hibernated;
+ return true;
}
bool