aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-20 13:44:00 +0100
committerGitHub Enterprise <[email protected]>2026-03-20 13:44:00 +0100
commit7cc4b1701aa2923573adabceed486229abba5a2d (patch)
tree04a1b5eddcabd24e5c5a50a817fa50c5829972f2 /src
parentZs/consul token (#870) (diff)
downloadzen-7cc4b1701aa2923573adabceed486229abba5a2d.tar.xz
zen-7cc4b1701aa2923573adabceed486229abba5a2d.zip
add hub instance info (#869)
- Improvement: Hub module listing now includes per-instance process metrics (memory, CPU time, working set, pagefile usage) - Improvement: Hub now monitors provisioned instance health in the background and refreshes process metrics periodically - Improvement: Hub no longer exposes raw `StorageServerInstance` pointers to callers; instance state is returned as value snapshots (`Hub::InstanceInfo`) - Improvement: Hub instance access is now guarded by RAII per-instance locks (`SharedLockedPtr`/`ExclusiveLockedPtr`), preventing concurrent modifications during provisioning and deprovisioning - Improvement: Hub instance lifecycle is now tracked as a `HubInstanceState` enum covering transitional states (Provisioning, Deprovisioning, Hibernating, Waking); exposed as a string in the HTTP API and dashboard
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/process.h2
-rw-r--r--src/zencore/include/zencore/thread.h2
-rw-r--r--src/zencore/process.cpp2
-rw-r--r--src/zencore/thread.cpp12
-rw-r--r--src/zenserver-test/hub-tests.cpp12
-rw-r--r--src/zenserver-test/projectstore-tests.cpp13
-rw-r--r--src/zenserver/frontend/html/compute/hub.html6
-rw-r--r--src/zenserver/frontend/html/pages/hub.js2
-rw-r--r--src/zenserver/hub/httphubservice.cpp47
-rw-r--r--src/zenserver/hub/hub.cpp385
-rw-r--r--src/zenserver/hub/hub.h56
-rw-r--r--src/zenserver/hub/hubinstancestate.cpp33
-rw-r--r--src/zenserver/hub/hubinstancestate.h23
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp293
-rw-r--r--src/zenserver/hub/storageserverinstance.h129
-rw-r--r--src/zenserver/hub/zenhubserver.h4
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h27
-rw-r--r--src/zenutil/zenserverprocess.cpp2
18 files changed, 845 insertions, 205 deletions
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index 3177f64c1..96afd5950 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -176,7 +176,7 @@ struct ProcessMetrics
uint64_t PeakPagefileUsage = 0;
};
-void GetProcessMetrics(ProcessHandle& Handle, ProcessMetrics& OutMetrics);
+void GetProcessMetrics(const ProcessHandle& Handle, ProcessMetrics& OutMetrics);
void process_forcelink(); // internal
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h
index d0d710ee8..d7262324f 100644
--- a/src/zencore/include/zencore/thread.h
+++ b/src/zencore/include/zencore/thread.h
@@ -28,9 +28,11 @@ class RwLock
{
public:
void AcquireShared() noexcept;
+ bool TryAcquireShared() noexcept;
void ReleaseShared() noexcept;
void AcquireExclusive() noexcept;
+ bool TryAcquireExclusive() noexcept;
void ReleaseExclusive() noexcept;
struct SharedLockScope
diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp
index 0c55e6c7e..29de107bd 100644
--- a/src/zencore/process.cpp
+++ b/src/zencore/process.cpp
@@ -1564,7 +1564,7 @@ WaitForThreads(uint64_t WaitTimeMs)
}
void
-GetProcessMetrics(ProcessHandle& Handle, ProcessMetrics& OutMetrics)
+GetProcessMetrics(const ProcessHandle& Handle, ProcessMetrics& OutMetrics)
{
#if ZEN_PLATFORM_WINDOWS
FILETIME CreationTime;
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index 54459cbaa..f74791333 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -146,6 +146,12 @@ RwLock::AcquireShared() noexcept
m_Mutex.lock_shared();
}
+bool
+RwLock::TryAcquireShared() noexcept
+{
+ return m_Mutex.try_lock_shared();
+}
+
void
RwLock::ReleaseShared() noexcept
{
@@ -158,6 +164,12 @@ RwLock::AcquireExclusive() noexcept
m_Mutex.lock();
}
+bool
+RwLock::TryAcquireExclusive() noexcept
+{
+ return m_Mutex.try_lock();
+}
+
void
RwLock::ReleaseExclusive() noexcept
{
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 29c3b76ba..a372b11e5 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -44,7 +44,8 @@ TEST_CASE("hub.lifecycle.basic")
HttpClient Client(Instance.GetBaseUri() + "/hub/");
HttpClient::Response Result = Client.Get("status");
- CHECK(Result);
+ REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
}
}
@@ -71,6 +72,10 @@ TEST_CASE("hub.lifecycle.children")
const uint16_t AbcPort = AbcResult["port"].AsUInt16(0);
CHECK_NE(AbcPort, 0);
+ Result = Client.Get("modules/abc");
+ REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv);
+
// This should be a fresh instance with no contents
HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort));
@@ -231,9 +236,10 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Post("modules/def/deprovision");
REQUIRE(Result);
- // final sanity check that the hub is still responsive
+ // final sanity check that the hub is still responsive and all modules are gone
Result = Client.Get("status");
- CHECK(Result);
+ REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
}
}
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp
index 5cc75c590..a37ecb6be 100644
--- a/src/zenserver-test/projectstore-tests.cpp
+++ b/src/zenserver-test/projectstore-tests.cpp
@@ -1027,7 +1027,7 @@ TEST_CASE("project.rpcappendop")
std::string_view ProjectName,
std::string_view OplogName,
std::span<const CompressedBuffer> Attachments,
- void* ServerProcessHandle,
+ const ProcessHandle& ServerProcessHandle,
const std::filesystem::path& TempPath) {
CompositeBuffer PackageMessage;
{
@@ -1054,7 +1054,8 @@ TEST_CASE("project.rpcappendop")
Request.EndArray(); // "chunks"
RequestPackage.SetObject(Request.Save());
- PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle));
+ PackageMessage =
+ CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle.Handle()));
}
HttpClient::Response Response =
@@ -1063,8 +1064,8 @@ TEST_CASE("project.rpcappendop")
};
{
- HttpClient Client(Servers.GetInstance(0).GetBaseUri());
- void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle();
+ HttpClient Client(Servers.GetInstance(0).GetBaseUri());
+ const ProcessHandle& ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle();
MakeProject(Client, "proj0");
MakeOplog(Client, "proj0", "oplog0");
@@ -1108,8 +1109,8 @@ TEST_CASE("project.rpcappendop")
}
{
- HttpClient Client(Servers.GetInstance(1).GetBaseUri());
- void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk
+ HttpClient Client(Servers.GetInstance(1).GetBaseUri());
+ ProcessHandle ServerProcessHandle; // Force use of path for attachments passed on disk
MakeProject(Client, "proj0");
MakeOplog(Client, "proj0", "oplog0");
diff --git a/src/zenserver/frontend/html/compute/hub.html b/src/zenserver/frontend/html/compute/hub.html
index 620349a2b..b15b34577 100644
--- a/src/zenserver/frontend/html/compute/hub.html
+++ b/src/zenserver/frontend/html/compute/hub.html
@@ -127,11 +127,11 @@
for (var i = 0; i < modules.length; i++) {
var m = modules[i];
var moduleId = m.moduleId || '';
- var provisioned = m.provisioned;
+ var state = m.state || 'unprovisioned';
- var badge = provisioned
+ var badge = (state === 'provisioned')
? '<span class="status-badge active">Provisioned</span>'
- : '<span class="status-badge inactive">Inactive</span>';
+ : '<span class="status-badge inactive">' + escapeHtml(state.charAt(0).toUpperCase() + state.slice(1)) + '</span>';
var tr = document.createElement('tr');
tr.innerHTML =
diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js
index f9e4fff33..00d156c5e 100644
--- a/src/zenserver/frontend/html/pages/hub.js
+++ b/src/zenserver/frontend/html/pages/hub.js
@@ -104,7 +104,7 @@ export class Page extends ZenPage
{
this._mod_table.add_row(
m.moduleId || "",
- m.provisioned ? "provisioned" : "inactive",
+ m.state || "unprovisioned",
);
}
}
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 67ed0cfd8..5497bcf2b 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -37,10 +37,23 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
Obj.BeginArray("modules");
- m_Hub.EnumerateModules([&Obj](StorageServerInstance& Instance) {
+ m_Hub.EnumerateModules([&Obj](std::string_view ModuleId, const Hub::InstanceInfo& Info) {
Obj.BeginObject();
- Obj << "moduleId" << Instance.GetModuleId();
- Obj << "provisioned" << Instance.IsProvisioned();
+ {
+ Obj << "moduleId" << ModuleId;
+ Obj << "state" << ToString(Info.State);
+ Obj.BeginObject("process_metrics");
+ {
+ Obj << "MemoryBytes" << Info.Metrics.MemoryBytes;
+ Obj << "KernelTimeMs" << Info.Metrics.KernelTimeMs;
+ Obj << "UserTimeMs" << Info.Metrics.UserTimeMs;
+ Obj << "WorkingSetSize" << Info.Metrics.WorkingSetSize;
+ Obj << "PeakWorkingSetSize" << Info.Metrics.PeakWorkingSetSize;
+ Obj << "PagefileUsage" << Info.Metrics.PagefileUsage;
+ Obj << "PeakPagefileUsage" << Info.Metrics.PeakPagefileUsage;
+ }
+ Obj.EndObject();
+ }
Obj.EndObject();
});
Obj.EndArray();
@@ -176,8 +189,8 @@ HttpHubService::HandleRequest(zen::HttpServerRequest& Request)
void
HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId)
{
- StorageServerInstance* Instance = nullptr;
- if (!m_Hub.Find(ModuleId, &Instance))
+ Hub::InstanceInfo InstanceInfo;
+ if (!m_Hub.Find(ModuleId, &InstanceInfo))
{
Request.WriteResponse(HttpResponseCode::NotFound);
return;
@@ -187,26 +200,36 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod
// with a dangling pointer...
CbObjectWriter Obj;
- Obj << "moduleId" << Instance->GetModuleId();
- Obj << "provisioned" << Instance->IsProvisioned();
+ Obj << "moduleId" << ModuleId;
+ Obj << "state" << ToString(InstanceInfo.State);
Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
}
void
HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId)
{
- StorageServerInstance* Instance = nullptr;
- if (!m_Hub.Find(ModuleId, &Instance))
+ Hub::InstanceInfo InstanceInfo;
+ if (!m_Hub.Find(ModuleId, &InstanceInfo))
{
Request.WriteResponse(HttpResponseCode::NotFound);
return;
}
- // TODO: deprovision and nuke all related storage
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ std::string Reason;
+ if (!m_Hub.Deprovision(std::string(ModuleId), Reason))
+ {
+ Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Reason);
+ return;
+ }
+ }
+
+ // TODO: nuke all related storage
CbObjectWriter Obj;
- Obj << "moduleId" << Instance->GetModuleId();
- Obj << "provisioned" << Instance->IsProvisioned();
+ Obj << "moduleId" << ModuleId;
+ Obj << "state" << ToString(InstanceInfo.State);
Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
}
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index c35fa61e8..b0208db1f 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -9,6 +9,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -150,6 +151,9 @@ Hub::Hub(const Configuration& Config,
ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max());
+ m_InstanceLookup.reserve(Config.InstanceLimit);
+ m_ActiveInstances.reserve(Config.InstanceLimit);
+
m_FreePorts.resize(Config.InstanceLimit);
std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber);
@@ -167,6 +171,7 @@ Hub::Hub(const Configuration& Config,
}
}
#endif
+ m_WatchDog = std::thread([this]() { WatchDog(); });
}
Hub::~Hub()
@@ -175,26 +180,43 @@ Hub::~Hub()
{
ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+ m_WatchDogEvent.Set();
+ if (m_WatchDog.joinable())
+ {
+ m_WatchDog.join();
+ }
+
+ m_WatchDog = {};
+
+ // WatchDog has been joined; no concurrent access is possible
m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, Instance] : m_Instances)
+ for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
{
- uint16_t BasePort = Instance->GetBasePort();
- std::string BaseUri; // TODO?
-
- if (m_DeprovisionedModuleCallback)
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
{
- try
- {
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
- }
- catch (const std::exception& Ex)
+ StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true));
+
+ uint16_t BasePort = InstanceRaw->GetBasePort();
+ std::string BaseUri; // TODO?
+
+ if (m_DeprovisionedModuleCallback)
{
- ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ try
+ {
+ m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ }
}
+ Instance.Deprovision();
}
- Instance->Deprovision();
+ InstanceRaw.reset();
}
- m_Instances.clear();
+ m_InstanceLookup.clear();
+ m_ActiveInstances.clear();
+ m_FreeActiveInstanceIndexes.clear();
});
}
catch (const std::exception& e)
@@ -206,20 +228,20 @@ Hub::~Hub()
bool
Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason)
{
- StorageServerInstance* Instance = nullptr;
- bool IsNewInstance = false;
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ bool IsNewInstance = false;
+ uint16_t AllocatedPort = 0;
{
RwLock::ExclusiveLockScope _(m_Lock);
- uint16_t AllocatedPort = 0;
- auto RestoreAllocatedPort = MakeGuard([this, &AllocatedPort]() {
- if (AllocatedPort != 0)
+ auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() {
+ if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
{
m_FreePorts.push_back(AllocatedPort);
AllocatedPort = 0;
}
});
- if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
std::string Reason;
if (!CanProvisionInstance(ModuleId, /* out */ Reason))
@@ -231,11 +253,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
return false;
}
+ IsNewInstance = true;
+
AllocatedPort = m_FreePorts.front();
+ ZEN_ASSERT(AllocatedPort != 0);
m_FreePorts.pop_front();
- IsNewInstance = true;
-
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
StorageServerInstance::Configuration{.BasePort = AllocatedPort,
@@ -245,63 +268,110 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
.CoreLimit = m_Config.InstanceCoreLimit,
.ConfigPath = m_Config.InstanceConfigPath},
ModuleId);
+
#if ZEN_PLATFORM_WINDOWS
if (m_JobObject.IsValid())
{
NewInstance->SetJobObject(&m_JobObject);
}
#endif
- Instance = NewInstance.get();
- m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
- AllocatedPort = 0;
+
+ Instance = NewInstance->LockExclusive(/*Wait*/ true);
+
+ size_t ActiveInstanceIndex = (size_t)-1;
+ if (!m_FreeActiveInstanceIndexes.empty())
+ {
+ ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back();
+ m_FreeActiveInstanceIndexes.pop_back();
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
+ m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance);
+ }
+ else
+ {
+ ActiveInstanceIndex = m_ActiveInstances.size();
+ m_ActiveInstances.emplace_back(std::move(NewInstance));
+ }
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+ m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+
+ const int CurrentInstanceCount = gsl::narrow_cast<int>(m_InstanceLookup.size());
+ int CurrentMaxCount = m_MaxInstanceCount.load();
+ const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount);
+
+ m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax);
}
else
{
- Instance = It->second.get();
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
+
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ AllocatedPort = InstanceRaw->GetBasePort();
}
m_ProvisioningModules.emplace(std::string(ModuleId));
}
- ZEN_ASSERT(Instance != nullptr);
+ ZEN_ASSERT(Instance);
auto RemoveProvisioningModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_ProvisioningModules.erase(std::string(ModuleId));
+ if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
+ {
+ m_FreePorts.push_back(AllocatedPort);
+ AllocatedPort = 0;
+ }
});
- // NOTE: this is done while not holding the lock, as provisioning may take time
+ // NOTE: this is done while not holding the hub lock, as provisioning may take time
// and we don't want to block other operations. We track which modules are being
// provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
// those modules while in this state.
- UpdateStats();
-
try
{
- Instance->Provision();
+ Instance.Provision();
+ Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ Instance = {};
+
if (IsNewInstance)
{
- // Clean up
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ // Clean up failed instance provisioning
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
+ {
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
+ ZEN_ASSERT(DestroyInstance);
+ ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ }
+ }
+ try
+ {
+ DestroyInstance.reset();
+ }
+ catch (const std::exception& Ex)
{
- ZEN_ASSERT(It->second != nullptr);
- uint16_t BasePort = It->second->GetBasePort();
- m_FreePorts.push_back(BasePort);
- m_Instances.erase(It);
+ ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, Ex.what());
}
}
return false;
}
- OutInfo.Port = Instance->GetBasePort();
+ OutInfo.Port = AllocatedPort;
// TODO: base URI? Would need to know what host name / IP to use
if (m_ProvisionedModuleCallback)
@@ -322,7 +392,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
bool
Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
{
- std::unique_ptr<StorageServerInstance> Instance;
+ std::unique_ptr<StorageServerInstance> RawInstance;
+ StorageServerInstance::ExclusiveLockedPtr Instance;
{
RwLock::ExclusiveLockScope _(m_Lock);
@@ -336,22 +407,31 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
return false;
}
- if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
+ if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end())
{
ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
- // Not found, OutReason should be empty
+ // Not found, OutReason left empty
return false;
}
else
{
- Instance = std::move(It->second);
- m_Instances.erase(It);
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
+ ZEN_ASSERT(RawInstance != nullptr);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
m_DeprovisioningModules.emplace(ModuleId);
+
+ Instance = RawInstance->LockExclusive(/*Wait*/ true);
}
}
- uint16_t BasePort = Instance->GetBasePort();
+ ZEN_ASSERT(RawInstance);
+ ZEN_ASSERT(Instance);
+
+ uint16_t BasePort = RawInstance->GetBasePort();
std::string BaseUri; // TODO?
if (m_DeprovisionedModuleCallback)
@@ -366,57 +446,82 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
}
}
- // The module is deprovisioned outside the lock to avoid blocking other operations.
+ // The module is deprovisioned outside the hub lock to avoid blocking other operations.
//
// To ensure that no new provisioning can occur while we're deprovisioning,
// we add the module ID to m_DeprovisioningModules and remove it once
// deprovisioning is complete.
auto _ = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(ModuleId);
- m_FreePorts.push_back(BasePort);
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(ModuleId);
+ m_FreePorts.push_back(BasePort);
+ }
});
- Instance->Deprovision();
+ Instance.Deprovision();
+
+ Instance = {};
return true;
}
bool
-Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance)
+Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo)
{
RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
{
- if (OutInstance)
+ if (OutInstanceInfo)
{
- *OutInstance = It->second.get();
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(Instance);
+ InstanceInfo Info{
+ Instance->GetState(),
+ std::chrono::system_clock::now() // TODO
+ };
+ Instance->GetProcessMetrics(Info.Metrics);
+
+ *OutInstanceInfo = Info;
}
return true;
}
- else if (OutInstance)
- {
- *OutInstance = nullptr;
- }
return false;
}
void
-Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback)
+Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback)
{
- RwLock::SharedLockScope _(m_Lock);
- for (auto& It : m_Instances)
+ std::vector<std::pair<std::string, InstanceInfo>> Infos;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
+ {
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(Instance);
+ InstanceInfo Info{
+ Instance->GetState(),
+ std::chrono::system_clock::now() // TODO
+ };
+ Instance->GetProcessMetrics(Info.Metrics);
+
+ Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info));
+ }
+ }
+
+ for (const std::pair<std::string, InstanceInfo>& Info : Infos)
{
- Callback(*It.second);
+ Callback(Info.first, Info.second);
}
}
int
Hub::GetInstanceCount()
{
- RwLock::SharedLockScope _(m_Lock);
- return gsl::narrow_cast<int>(m_Instances.size());
+ return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); });
}
void
@@ -424,13 +529,19 @@ Hub::UpdateCapacityMetrics()
{
m_HostMetrics = GetSystemMetrics();
- // Update per-instance metrics
+ // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks...
+ // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules
}
void
Hub::UpdateStats()
{
- m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
+ int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast<int>(m_InstanceLookup.size()); });
+ int CurrentMaxCount = m_MaxInstanceCount.load();
+
+ int NewMax = Max(CurrentMaxCount, CurrentInstanceCount);
+
+ m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax);
}
bool
@@ -450,19 +561,19 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return false;
}
- if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit)
+ if (gsl::narrow_cast<int>(m_InstanceLookup.size()) >= m_Config.InstanceLimit)
{
OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit);
return false;
}
- // Since deprovisioning happens outside the lock and we don't add the port back until the instance is full shut down we might be under
- // the instance limit but all ports may be in use
+ // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below
+ // the instance count limit but with no free ports available
if (m_FreePorts.empty())
{
OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})",
- m_Config.InstanceLimit - m_Instances.size());
+ m_Config.InstanceLimit - m_InstanceLookup.size());
return false;
}
@@ -472,6 +583,71 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return true;
}
+void
+Hub::WatchDog()
+{
+ constexpr uint64_t WatchDogWakeupTimeMs = 5000;
+ constexpr uint64_t WatchDogProcessingTimeMs = 500;
+
+ size_t CheckInstanceIndex = 0;
+ while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs))
+ {
+ try
+ {
+ size_t MaxCheckCount = m_Lock.WithSharedLock([this]() { return m_InstanceLookup.size(); });
+
+ Stopwatch Timer;
+ while (MaxCheckCount-- > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !m_WatchDogEvent.Wait(5))
+ {
+ StorageServerInstance::SharedLockedPtr LockedInstance;
+ m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance]() {
+ if (m_InstanceLookup.empty())
+ {
+ return;
+ }
+
+ size_t MaxLoopCount = m_ActiveInstances.size();
+ StorageServerInstance* Instance = nullptr;
+ while (MaxLoopCount-- > 0 && !Instance)
+ {
+ CheckInstanceIndex++;
+ if (CheckInstanceIndex >= m_ActiveInstances.size())
+ {
+ CheckInstanceIndex = 0;
+ }
+ Instance = (CheckInstanceIndex < m_ActiveInstances.size()) ? m_ActiveInstances[CheckInstanceIndex].get() : nullptr;
+ }
+
+ if (Instance)
+ {
+ LockedInstance = Instance->LockShared(/*Wait*/ false);
+ }
+ });
+
+ if (LockedInstance)
+ {
+ if (LockedInstance.IsRunning())
+ {
+ LockedInstance.UpdateMetrics();
+ }
+ else if (LockedInstance.GetState() == HubInstanceState::Provisioned)
+ {
+ // Process is not running but state says it should be — instance died unexpectedly.
+ // TODO: Track and attempt recovery.
+ }
+ // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) — expected, skip.
+ LockedInstance = {};
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc
+ ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what());
+ }
+ }
+}
+
#if ZEN_WITH_TESTS
TEST_SUITE_BEGIN("server.hub");
@@ -507,7 +683,9 @@ TEST_CASE("hub.provision_basic")
REQUIRE_MESSAGE(ProvisionResult, Reason);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- CHECK(HubInstance->Find("module_a"));
+ Hub::InstanceInfo InstanceInfo;
+ REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
+ CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
CHECK(DeprovisionResult);
@@ -541,7 +719,9 @@ TEST_CASE("hub.provision_config")
REQUIRE_MESSAGE(ProvisionResult, Reason);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- CHECK(HubInstance->Find("module_a"));
+ Hub::InstanceInfo InstanceInfo;
+ REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
+ CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri));
HttpClient::Response TestResponse = Client.Get("/status/builds");
@@ -660,7 +840,10 @@ TEST_CASE("hub.enumerate_modules")
REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
std::vector<std::string> Ids;
- HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) {
+ Ids.push_back(std::string(ModuleId));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ });
CHECK_EQ(Ids.size(), 2u);
const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
@@ -669,7 +852,10 @@ TEST_CASE("hub.enumerate_modules")
HubInstance->Deprovision("enum_a", Reason);
Ids.clear();
- HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) {
+ Ids.push_back(std::string(ModuleId));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ });
REQUIRE_EQ(Ids.size(), 1u);
CHECK_EQ(Ids[0], "enum_b");
}
@@ -936,6 +1122,59 @@ TEST_CASE("hub.job_object")
}
# endif // ZEN_PLATFORM_WINDOWS
+TEST_CASE("hub.instance_state_basic")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22400;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ Hub::InstanceInfo Info;
+ std::string Reason;
+
+ REQUIRE_MESSAGE(HubInstance->Provision("state_a", ProvInfo, Reason), Reason);
+
+ REQUIRE(HubInstance->Find("state_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+
+ HubInstance->Deprovision("state_a", Reason);
+ CHECK_FALSE(HubInstance->Find("state_a"));
+}
+
+TEST_CASE("hub.instance_state_enumerate")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22500;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ std::string Reason;
+ REQUIRE_MESSAGE(HubInstance->Provision("estate_a", ProvInfo, Reason), Reason);
+ REQUIRE_MESSAGE(HubInstance->Provision("estate_b", ProvInfo, Reason), Reason);
+
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) {
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ CHECK_EQ(ProvisionedCount, 2);
+
+ HubInstance->Deprovision("estate_a", Reason);
+
+ ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) {
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ CHECK_EQ(ProvisionedCount, 1);
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 7232b99ee..7094378f6 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -2,6 +2,7 @@
#pragma once
+#include "hubinstancestate.h"
#include "resourcemetrics.h"
#include <zencore/system.h>
@@ -11,6 +12,7 @@
#include <filesystem>
#include <functional>
#include <memory>
+#include <thread>
#include <unordered_map>
#include <unordered_set>
@@ -38,15 +40,15 @@ public:
/** Enable or disable the use of a Windows Job Object for child process management.
* When enabled, all spawned child processes are assigned to a job object with
* JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub
- * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
+ * crashes or is force-killed.
*/
bool UseJobObject = true;
uint16_t BasePortNumber = 21000;
int InstanceLimit = 1000;
- uint32_t InstanceHttpThreadCount = 0; // Deduce from core count
- int InstanceCoreLimit = 0; // Use hardware core count
+ uint32_t InstanceHttpThreadCount = 0; // Automatic
+ int InstanceCoreLimit = 0; // Automatic
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
};
@@ -62,6 +64,13 @@ public:
Hub(const Hub&) = delete;
Hub& operator=(const Hub&) = delete;
+ struct InstanceInfo
+ {
+ HubInstanceState State = HubInstanceState::Unprovisioned;
+ std::chrono::system_clock::time_point ProvisionTime;
+ ProcessMetrics Metrics;
+ };
+
/**
* Provision a storage server instance for the given module ID.
*
@@ -81,29 +90,24 @@ public:
bool Deprovision(const std::string& ModuleId, std::string& OutReason);
/**
- * Find a storage server instance for the given module ID.
- *
- * Beware that as this returns a raw pointer to the instance, the caller must ensure
- * that the instance is not deprovisioned while in use.
+ * Find info about storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to find.
- * @param OutInstance If found, the instance will be returned here.
+ * @param OutInstanceInfo If found, the instance info will be returned here.
* @return true if the instance was found, false otherwise.
*/
- bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr);
+ bool Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo = nullptr);
/**
- * Enumerate all storage server instances.
+ * Enumerate a snapshot of all storage server instances.
*
- * @param Callback The callback to invoke for each instance. Note that you should
- * not do anything heavyweight in the callback as it is invoked while holding
- * a shared lock.
+ * @param Callback The callback to invoke for each instance.
*/
- void EnumerateModules(std::function<void(StorageServerInstance&)> Callback);
+ void EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback);
int GetInstanceCount();
- int GetMaxInstanceCount() const { return m_MaxInstanceCount; }
+ int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); }
const Configuration& GetConfig() const { return m_Config; }
@@ -120,14 +124,20 @@ private:
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
#endif
- RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
- std::unordered_set<std::string> m_DeprovisioningModules;
- std::unordered_set<std::string> m_ProvisioningModules;
- ResourceMetrics m_ResourceLimits;
- SystemMetrics m_HostMetrics;
- int m_MaxInstanceCount = 0;
- std::deque<uint16_t> m_FreePorts;
+ RwLock m_Lock;
+ std::unordered_map<std::string, size_t> m_InstanceLookup;
+ std::unordered_set<std::string> m_DeprovisioningModules;
+ std::unordered_set<std::string> m_ProvisioningModules;
+ std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances;
+ std::vector<size_t> m_FreeActiveInstanceIndexes;
+ ResourceMetrics m_ResourceLimits;
+ SystemMetrics m_HostMetrics;
+ std::atomic<int> m_MaxInstanceCount = 0;
+ std::deque<uint16_t> m_FreePorts;
+ std::thread m_WatchDog;
+
+ Event m_WatchDogEvent;
+ void WatchDog();
void UpdateStats();
void UpdateCapacityMetrics();
diff --git a/src/zenserver/hub/hubinstancestate.cpp b/src/zenserver/hub/hubinstancestate.cpp
new file mode 100644
index 000000000..8337f73a8
--- /dev/null
+++ b/src/zenserver/hub/hubinstancestate.cpp
@@ -0,0 +1,33 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hubinstancestate.h"
+
+#include <zencore/assertfmt.h>
+
+namespace zen {
+
+std::string_view
+ToString(HubInstanceState State)
+{
+ switch (State)
+ {
+ case HubInstanceState::Unprovisioned:
+ return "unprovisioned";
+ case HubInstanceState::Provisioning:
+ return "provisioning";
+ case HubInstanceState::Provisioned:
+ return "provisioned";
+ case HubInstanceState::Hibernating:
+ return "hibernating";
+ case HubInstanceState::Hibernated:
+ return "hibernated";
+ case HubInstanceState::Waking:
+ return "waking";
+ case HubInstanceState::Deprovisioning:
+ return "deprovisioning";
+ }
+ ZEN_ASSERT(false);
+ return "unknown";
+}
+
+} // namespace zen
diff --git a/src/zenserver/hub/hubinstancestate.h b/src/zenserver/hub/hubinstancestate.h
new file mode 100644
index 000000000..ce1f222bd
--- /dev/null
+++ b/src/zenserver/hub/hubinstancestate.h
@@ -0,0 +1,23 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+
+namespace zen {
+
+enum class HubInstanceState : uint32_t
+{
+ Unprovisioned, // Initial state; process not running
+ Provisioning, // Hydrating and spawning process
+ Provisioned, // Process running and serving requests
+ Hibernating, // Shutting down process, preserving data on disk
+ Hibernated, // Process stopped, data preserved; can be woken
+ Waking, // Starting process from preserved data
+ Deprovisioning, // Shutting down process and cleaning up data
+};
+
+std::string_view ToString(HubInstanceState State);
+
+} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index c54322683..bab501429 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -57,72 +57,82 @@ StorageServerInstance::SpawnServerProcess()
}
void
-StorageServerInstance::Provision()
+StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const
{
- RwLock::ExclusiveLockScope _(m_Lock);
+ OutMetrics.MemoryBytes = m_MemoryBytes.load();
+ OutMetrics.KernelTimeMs = m_KernelTimeMs.load();
+ OutMetrics.UserTimeMs = m_UserTimeMs.load();
+ OutMetrics.WorkingSetSize = m_WorkingSetSize.load();
+ OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load();
+ OutMetrics.PagefileUsage = m_PagefileUsage.load();
+ OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load();
+}
- if (m_IsProvisioned)
+void
+StorageServerInstance::ProvisionLocked()
+{
+ if (m_State.load() == HubInstanceState::Provisioned)
{
ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId);
-
return;
}
- if (m_IsHibernated)
+ if (m_State.load() == HubInstanceState::Hibernated)
{
- WakeLocked();
+ if (WakeLocked())
+ {
+ return;
+ }
+ // Wake failed; proceed with a fresh provision (discards hibernated data)
+ m_State = HubInstanceState::Unprovisioned;
}
- else
+ else if (m_State.load() != HubInstanceState::Unprovisioned)
{
- ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
-
- Hydrate();
-
- SpawnServerProcess();
+ ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision",
+ m_ModuleId,
+ ToString(m_State.load()));
+ return;
}
- m_IsProvisioned = true;
+ ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
+
+ m_State = HubInstanceState::Provisioning;
+ Hydrate();
+ SpawnServerProcess();
+ m_State = HubInstanceState::Provisioned;
}
void
-StorageServerInstance::Deprovision()
+StorageServerInstance::DeprovisionLocked()
{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (!m_IsProvisioned)
+ if (m_State.load() != HubInstanceState::Provisioned)
{
- ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId);
-
+ ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')",
+ m_ModuleId,
+ ToString(m_State.load()));
return;
}
ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
+ m_State = HubInstanceState::Deprovisioning;
m_ServerInstance.Shutdown();
Dehydrate();
- m_IsProvisioned = false;
+ m_State = HubInstanceState::Unprovisioned;
}
void
-StorageServerInstance::Hibernate()
+StorageServerInstance::HibernateLocked()
{
// Signal server to shut down, but keep data around for later wake
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (!m_IsProvisioned)
- {
- ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId);
-
- return;
- }
-
- if (m_IsHibernated)
+ if (m_State.load() != HubInstanceState::Provisioned)
{
- ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId);
-
+ ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned (state: '{}')",
+ m_ModuleId,
+ ToString(m_State.load()));
return;
}
@@ -135,52 +145,46 @@ StorageServerInstance::Hibernate()
return;
}
+ m_State = HubInstanceState::Hibernating;
try
{
m_ServerInstance.Shutdown();
-
- m_IsHibernated = true;
- m_IsProvisioned = false;
-
+ m_State = HubInstanceState::Hibernated;
return;
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what());
+ m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running
}
}
-void
-StorageServerInstance::Wake()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- WakeLocked();
-}
-
-void
+bool
StorageServerInstance::WakeLocked()
{
// Start server in-place using existing data
- if (!m_IsHibernated)
+ if (m_State.load() != HubInstanceState::Hibernated)
{
ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId);
- return;
+ return true; // Instance is already usable (noop success)
}
ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
+ m_State = HubInstanceState::Waking;
try
{
SpawnServerProcess();
- m_IsHibernated = false;
+ m_State = HubInstanceState::Provisioned;
+ return true;
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what());
-
- // TODO: this instance should be marked as invalid
+ m_State = HubInstanceState::Hibernated;
+ return false;
}
}
@@ -210,4 +214,193 @@ StorageServerInstance::Dehydrate()
Hydrator->Dehydrate();
}
+StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr)
+{
+}
+
+StorageServerInstance::SharedLockedPtr::SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait)
+: m_Lock(nullptr)
+, m_Instance(nullptr)
+{
+ ZEN_ASSERT(Instance != nullptr);
+ if (Wait)
+ {
+ Lock.AcquireShared();
+ m_Lock = &Lock;
+ m_Instance = Instance;
+ }
+ else
+ {
+ if (Lock.TryAcquireShared())
+ {
+ m_Lock = &Lock;
+ m_Instance = Instance;
+ }
+ }
+}
+
+StorageServerInstance::SharedLockedPtr::SharedLockedPtr(SharedLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance)
+{
+ Rhs.m_Lock = nullptr;
+ Rhs.m_Instance = nullptr;
+}
+
+StorageServerInstance::SharedLockedPtr::~SharedLockedPtr()
+{
+ if (m_Lock != nullptr)
+ {
+ m_Lock->ReleaseShared();
+ m_Lock = nullptr;
+ }
+ m_Instance = nullptr;
+}
+
+StorageServerInstance::SharedLockedPtr&
+StorageServerInstance::SharedLockedPtr::operator=(SharedLockedPtr&& Rhs)
+{
+ if (m_Lock)
+ {
+ m_Lock->ReleaseShared();
+ m_Lock = nullptr;
+ m_Instance = nullptr;
+ }
+ m_Lock = Rhs.m_Lock;
+ m_Instance = Rhs.m_Instance;
+ Rhs.m_Lock = nullptr;
+ Rhs.m_Instance = nullptr;
+ return *this;
+}
+
+std::string_view
+StorageServerInstance::SharedLockedPtr::GetModuleId() const
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ return m_Instance->m_ModuleId;
+}
+
+bool
+StorageServerInstance::SharedLockedPtr::IsRunning() const
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning();
+}
+
+void
+StorageServerInstance::UpdateMetricsLocked()
+{
+ if (m_State.load() == HubInstanceState::Provisioned)
+ {
+ ProcessMetrics Metrics;
+ zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics);
+
+ m_MemoryBytes.store(Metrics.MemoryBytes);
+ m_KernelTimeMs.store(Metrics.KernelTimeMs);
+ m_UserTimeMs.store(Metrics.UserTimeMs);
+ m_WorkingSetSize.store(Metrics.WorkingSetSize);
+ m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize);
+ m_PagefileUsage.store(Metrics.PagefileUsage);
+ m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage);
+ }
+ // TODO: Resource metrics...
+}
+
+StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr() : m_Lock(nullptr), m_Instance(nullptr)
+{
+}
+
+StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait)
+: m_Lock(nullptr)
+, m_Instance(nullptr)
+{
+ ZEN_ASSERT(Instance != nullptr);
+ if (Wait)
+ {
+ Lock.AcquireExclusive();
+ m_Lock = &Lock;
+ m_Instance = Instance;
+ }
+ else
+ {
+ if (Lock.TryAcquireExclusive())
+ {
+ m_Lock = &Lock;
+ m_Instance = Instance;
+ }
+ }
+}
+
+StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance)
+{
+ Rhs.m_Lock = nullptr;
+ Rhs.m_Instance = nullptr;
+}
+
+StorageServerInstance::ExclusiveLockedPtr::~ExclusiveLockedPtr()
+{
+ if (m_Lock != nullptr)
+ {
+ m_Lock->ReleaseExclusive();
+ m_Lock = nullptr;
+ }
+ m_Instance = nullptr;
+}
+
+StorageServerInstance::ExclusiveLockedPtr&
+StorageServerInstance::ExclusiveLockedPtr::operator=(ExclusiveLockedPtr&& Rhs)
+{
+ if (m_Lock)
+ {
+ m_Lock->ReleaseExclusive();
+ m_Lock = nullptr;
+ m_Instance = nullptr;
+ }
+ m_Lock = Rhs.m_Lock;
+ m_Instance = Rhs.m_Instance;
+ Rhs.m_Lock = nullptr;
+ Rhs.m_Instance = nullptr;
+ return *this;
+}
+
+std::string_view
+StorageServerInstance::ExclusiveLockedPtr::GetModuleId() const
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ return m_Instance->m_ModuleId;
+}
+
+bool
+StorageServerInstance::ExclusiveLockedPtr::IsRunning() const
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning();
+}
+
+void
+StorageServerInstance::ExclusiveLockedPtr::Provision()
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ m_Instance->ProvisionLocked();
+}
+
+void
+StorageServerInstance::ExclusiveLockedPtr::Deprovision()
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ m_Instance->DeprovisionLocked();
+}
+
+void
+StorageServerInstance::ExclusiveLockedPtr::Hibernate()
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ m_Instance->HibernateLocked();
+}
+
+bool
+StorageServerInstance::ExclusiveLockedPtr::Wake()
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ return m_Instance->WakeLocked();
+}
+
} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 9b8aa7ac9..bf6fff8a0 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -2,6 +2,7 @@
#pragma once
+#include "hubinstancestate.h"
#include "resourcemetrics.h"
#include <zenutil/zenserverprocess.h>
@@ -26,44 +27,137 @@ public:
uint16_t BasePort;
std::filesystem::path HydrationTempPath;
std::string HydrationTargetSpecification;
- uint32_t HttpThreadCount = 0; // Deduce from core count
- int CoreLimit = 0; // Use hardware core count
+ uint32_t HttpThreadCount = 0; // Automatic
+ int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
};
StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId);
~StorageServerInstance();
- void Provision();
- void Deprovision();
-
- void Hibernate();
- void Wake();
-
const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; }
inline std::string_view GetModuleId() const { return m_ModuleId; }
- inline bool IsProvisioned() const { return m_IsProvisioned.load(); }
-
- inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); }
+ inline HubInstanceState GetState() const { return m_State.load(); }
+ inline uint16_t GetBasePort() const { return m_Config.BasePort; };
+ void GetProcessMetrics(ProcessMetrics& OutMetrics) const;
#if ZEN_PLATFORM_WINDOWS
void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; }
#endif
+ class SharedLockedPtr
+ {
+ public:
+ SharedLockedPtr(const SharedLockedPtr&) = delete;
+ SharedLockedPtr();
+
+ SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait);
+
+ SharedLockedPtr(SharedLockedPtr&& Rhs);
+ ~SharedLockedPtr();
+
+ SharedLockedPtr& operator=(const SharedLockedPtr&) = delete;
+ SharedLockedPtr& operator =(SharedLockedPtr&& Rhs);
+
+ operator bool() const { return m_Instance != nullptr; }
+
+ std::string_view GetModuleId() const;
+ HubInstanceState GetState() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->m_State.load();
+ }
+ bool IsRunning() const;
+
+ const ResourceMetrics& GetResourceMetrics() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->m_ResourceMetrics;
+ }
+ void UpdateMetrics()
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->UpdateMetricsLocked();
+ }
+
+ private:
+ RwLock* m_Lock = nullptr;
+ StorageServerInstance* m_Instance = nullptr;
+ };
+
+ [[nodiscard]] SharedLockedPtr LockShared(bool Wait) { return SharedLockedPtr(m_Lock, this, Wait); }
+
+ class ExclusiveLockedPtr
+ {
+ public:
+ ExclusiveLockedPtr(const ExclusiveLockedPtr&) = delete;
+ ExclusiveLockedPtr();
+
+ ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait);
+
+ ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs);
+ ~ExclusiveLockedPtr();
+
+ ExclusiveLockedPtr& operator=(const ExclusiveLockedPtr&) = delete;
+ ExclusiveLockedPtr& operator =(ExclusiveLockedPtr&& Rhs);
+
+ operator bool() const { return m_Instance != nullptr; }
+
+ std::string_view GetModuleId() const;
+ HubInstanceState GetState() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->m_State.load();
+ }
+ bool IsRunning() const;
+
+ const ResourceMetrics& GetResourceMetrics() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->m_ResourceMetrics;
+ }
+
+ void Provision();
+ void Deprovision();
+ void Hibernate();
+ bool Wake();
+
+ private:
+ RwLock* m_Lock = nullptr;
+ StorageServerInstance* m_Instance = nullptr;
+ };
+
+ [[nodiscard]] ExclusiveLockedPtr LockExclusive(bool Wait) { return ExclusiveLockedPtr(m_Lock, this, Wait); }
+
private:
- void WakeLocked();
- RwLock m_Lock;
+ void ProvisionLocked();
+ void DeprovisionLocked();
+
+ void HibernateLocked();
+ [[nodiscard]] bool WakeLocked();
+
+ void UpdateMetricsLocked();
+
+ mutable RwLock m_Lock;
const Configuration m_Config;
std::string m_ModuleId;
ZenServerInstance m_ServerInstance;
- std::atomic<bool> m_IsProvisioned{false};
- std::atomic<bool> m_IsHibernated{false};
- std::filesystem::path m_BaseDir;
+ std::atomic<HubInstanceState> m_State{HubInstanceState::Unprovisioned};
+ std::filesystem::path m_BaseDir;
std::filesystem::path m_TempDir;
ResourceMetrics m_ResourceMetrics;
+
+ std::atomic<uint64_t> m_MemoryBytes = 0;
+ std::atomic<uint64_t> m_KernelTimeMs = 0;
+ std::atomic<uint64_t> m_UserTimeMs = 0;
+ std::atomic<uint64_t> m_WorkingSetSize = 0;
+ std::atomic<uint64_t> m_PeakWorkingSetSize = 0;
+ std::atomic<uint64_t> m_PagefileUsage = 0;
+ std::atomic<uint64_t> m_PeakPagefileUsage = 0;
+
#if ZEN_PLATFORM_WINDOWS
JobObject* m_JobObject = nullptr;
#endif
@@ -72,6 +166,9 @@ private:
void Hydrate();
void Dehydrate();
+
+ friend class SharedLockedPtr;
+ friend class ExclusiveLockedPtr;
};
} // namespace zen
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 33d813122..d44a13d86 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -29,8 +29,8 @@ struct ZenHubServerConfig : public ZenServerConfig
int HubInstanceLimit = 1000;
bool HubUseJobObject = true;
std::string HubInstanceHttpClass = "asio";
- uint32_t HubInstanceHttpThreadCount = 0; // Deduce from core count
- int HubInstanceCoreLimit = 0; // Use hardware core count
+ uint32_t HubInstanceHttpThreadCount = 0; // Automatic
+ int HubInstanceCoreLimit = 0; // Automatic
std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
std::string HydrationTargetSpecification; // hydration/dehydration target specification
};
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 5528b766c..308ae0ef2 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -115,22 +115,23 @@ struct ZenServerInstance
ZenServerInstance(ZenServerEnvironment& TestEnvironment, ServerMode Mode = ServerMode::kStorageServer);
~ZenServerInstance();
- int Shutdown();
- bool SignalShutdown(std::error_code& OutEc);
- uint16_t WaitUntilReady();
- [[nodiscard]] bool WaitUntilReady(int Timeout);
- [[nodiscard]] bool WaitUntilExited(int Timeout, std::error_code& OutEc);
- void EnableTermination() { m_Terminate = true; }
- void EnableShutdownOnDestroy() { m_ShutdownOnDestroy = true; }
- void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; }
- void Detach();
- inline int GetPid() const { return m_Process.Pid(); }
- inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; }
- void* GetProcessHandle() const { return m_Process.Handle(); }
+ int Shutdown();
+ bool SignalShutdown(std::error_code& OutEc);
+ uint16_t WaitUntilReady();
+ [[nodiscard]] bool WaitUntilReady(int Timeout);
+ [[nodiscard]] bool WaitUntilExited(int Timeout, std::error_code& OutEc);
+ void EnableTermination() { m_Terminate = true; }
+ void EnableShutdownOnDestroy() { m_ShutdownOnDestroy = true; }
+ void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; }
+ void Detach();
+ inline int GetPid() const { return m_Process.Pid(); }
+ inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; }
+ const ProcessHandle& GetProcessHandle() const { return m_Process; }
+
#if ZEN_PLATFORM_WINDOWS
void SetJobObject(JobObject* Job) { m_JobObject = Job; }
#endif
- bool IsRunning();
+ bool IsRunning() const;
bool Terminate();
std::string GetLogOutput() const;
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index ebce9730e..8eaf2cf5b 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -1419,7 +1419,7 @@ ZenServerInstance::SetDataDir(std::filesystem::path TestDir)
}
bool
-ZenServerInstance::IsRunning()
+ZenServerInstance::IsRunning() const
{
if (!m_Process.IsValid())
{