diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-20 13:44:00 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-20 13:44:00 +0100 |
| commit | 7cc4b1701aa2923573adabceed486229abba5a2d (patch) | |
| tree | 04a1b5eddcabd24e5c5a50a817fa50c5829972f2 /src | |
| parent | Zs/consul token (#870) (diff) | |
| download | zen-7cc4b1701aa2923573adabceed486229abba5a2d.tar.xz zen-7cc4b1701aa2923573adabceed486229abba5a2d.zip | |
add hub instance info (#869)
- Improvement: Hub module listing now includes per-instance process metrics (memory, CPU time, working set, pagefile usage)
- Improvement: Hub now monitors provisioned instance health in the background and refreshes process metrics periodically
- Improvement: Hub no longer exposes raw `StorageServerInstance` pointers to callers; instance state is returned as value snapshots (`Hub::InstanceInfo`)
- Improvement: Hub instance access is now guarded by RAII per-instance locks (`SharedLockedPtr`/`ExclusiveLockedPtr`), preventing concurrent modifications during provisioning and deprovisioning
- Improvement: Hub instance lifecycle is now tracked as a `HubInstanceState` enum covering transitional states (Provisioning, Deprovisioning, Hibernating, Waking); exposed as a string in the HTTP API and dashboard
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/process.h | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/thread.h | 2 | ||||
| -rw-r--r-- | src/zencore/process.cpp | 2 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver-test/projectstore-tests.cpp | 13 | ||||
| -rw-r--r-- | src/zenserver/frontend/html/compute/hub.html | 6 | ||||
| -rw-r--r-- | src/zenserver/frontend/html/pages/hub.js | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 47 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 385 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 56 | ||||
| -rw-r--r-- | src/zenserver/hub/hubinstancestate.cpp | 33 | ||||
| -rw-r--r-- | src/zenserver/hub/hubinstancestate.h | 23 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 293 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 129 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 27 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 2 |
18 files changed, 845 insertions, 205 deletions
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index 3177f64c1..96afd5950 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -176,7 +176,7 @@ struct ProcessMetrics uint64_t PeakPagefileUsage = 0; }; -void GetProcessMetrics(ProcessHandle& Handle, ProcessMetrics& OutMetrics); +void GetProcessMetrics(const ProcessHandle& Handle, ProcessMetrics& OutMetrics); void process_forcelink(); // internal diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h index d0d710ee8..d7262324f 100644 --- a/src/zencore/include/zencore/thread.h +++ b/src/zencore/include/zencore/thread.h @@ -28,9 +28,11 @@ class RwLock { public: void AcquireShared() noexcept; + bool TryAcquireShared() noexcept; void ReleaseShared() noexcept; void AcquireExclusive() noexcept; + bool TryAcquireExclusive() noexcept; void ReleaseExclusive() noexcept; struct SharedLockScope diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index 0c55e6c7e..29de107bd 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -1564,7 +1564,7 @@ WaitForThreads(uint64_t WaitTimeMs) } void -GetProcessMetrics(ProcessHandle& Handle, ProcessMetrics& OutMetrics) +GetProcessMetrics(const ProcessHandle& Handle, ProcessMetrics& OutMetrics) { #if ZEN_PLATFORM_WINDOWS FILETIME CreationTime; diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index 54459cbaa..f74791333 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -146,6 +146,12 @@ RwLock::AcquireShared() noexcept m_Mutex.lock_shared(); } +bool +RwLock::TryAcquireShared() noexcept +{ + return m_Mutex.try_lock_shared(); +} + void RwLock::ReleaseShared() noexcept { @@ -158,6 +164,12 @@ RwLock::AcquireExclusive() noexcept m_Mutex.lock(); } +bool +RwLock::TryAcquireExclusive() noexcept +{ + return m_Mutex.try_lock(); +} + void RwLock::ReleaseExclusive() noexcept { diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index 29c3b76ba..a372b11e5 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -44,7 +44,8 @@ TEST_CASE("hub.lifecycle.basic") HttpClient Client(Instance.GetBaseUri() + "/hub/"); HttpClient::Response Result = Client.Get("status"); - CHECK(Result); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); } } @@ -71,6 +72,10 @@ TEST_CASE("hub.lifecycle.children") const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); CHECK_NE(AbcPort, 0); + Result = Client.Get("modules/abc"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + // This should be a fresh instance with no contents HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); @@ -231,9 +236,10 @@ TEST_CASE("hub.lifecycle.children") Result = Client.Post("modules/def/deprovision"); REQUIRE(Result); - // final sanity check that the hub is still responsive + // final sanity check that the hub is still responsive and all modules are gone Result = Client.Get("status"); - CHECK(Result); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); } } diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp index 5cc75c590..a37ecb6be 100644 --- a/src/zenserver-test/projectstore-tests.cpp +++ b/src/zenserver-test/projectstore-tests.cpp @@ -1027,7 +1027,7 @@ TEST_CASE("project.rpcappendop") std::string_view ProjectName, std::string_view OplogName, std::span<const CompressedBuffer> Attachments, - void* ServerProcessHandle, + const ProcessHandle& ServerProcessHandle, const std::filesystem::path& TempPath) { CompositeBuffer PackageMessage; { @@ -1054,7 +1054,8 @@ TEST_CASE("project.rpcappendop") Request.EndArray(); // "chunks" RequestPackage.SetObject(Request.Save()); - PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); + PackageMessage = + CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle.Handle())); } HttpClient::Response Response = @@ -1063,8 +1064,8 @@ TEST_CASE("project.rpcappendop") }; { - HttpClient Client(Servers.GetInstance(0).GetBaseUri()); - void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); + HttpClient Client(Servers.GetInstance(0).GetBaseUri()); + const ProcessHandle& ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); MakeProject(Client, "proj0"); MakeOplog(Client, "proj0", "oplog0"); @@ -1108,8 +1109,8 @@ TEST_CASE("project.rpcappendop") } { - HttpClient Client(Servers.GetInstance(1).GetBaseUri()); - void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk + HttpClient Client(Servers.GetInstance(1).GetBaseUri()); + ProcessHandle ServerProcessHandle; // Force use of path for attachments passed on disk MakeProject(Client, "proj0"); MakeOplog(Client, "proj0", "oplog0"); diff --git a/src/zenserver/frontend/html/compute/hub.html b/src/zenserver/frontend/html/compute/hub.html index 620349a2b..b15b34577 100644 --- a/src/zenserver/frontend/html/compute/hub.html +++ b/src/zenserver/frontend/html/compute/hub.html @@ -127,11 +127,11 @@ for (var i = 0; i < modules.length; i++) { var m = modules[i]; var moduleId = m.moduleId || ''; - var provisioned = m.provisioned; + var state = m.state || 'unprovisioned'; - var badge = provisioned + var badge = (state === 'provisioned') ? '<span class="status-badge active">Provisioned</span>' - : '<span class="status-badge inactive">Inactive</span>'; + : '<span class="status-badge inactive">' + escapeHtml(state.charAt(0).toUpperCase() + state.slice(1)) + '</span>'; var tr = document.createElement('tr'); tr.innerHTML = diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js index f9e4fff33..00d156c5e 100644 --- a/src/zenserver/frontend/html/pages/hub.js +++ b/src/zenserver/frontend/html/pages/hub.js @@ -104,7 +104,7 @@ export class Page extends ZenPage { this._mod_table.add_row( m.moduleId || "", - m.provisioned ? "provisioned" : "inactive", + m.state || "unprovisioned", ); } } diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 67ed0cfd8..5497bcf2b 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -37,10 +37,23 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) [this](HttpRouterRequest& Req) { CbObjectWriter Obj; Obj.BeginArray("modules"); - m_Hub.EnumerateModules([&Obj](StorageServerInstance& Instance) { + m_Hub.EnumerateModules([&Obj](std::string_view ModuleId, const Hub::InstanceInfo& Info) { Obj.BeginObject(); - Obj << "moduleId" << Instance.GetModuleId(); - Obj << "provisioned" << Instance.IsProvisioned(); + { + Obj << "moduleId" << ModuleId; + Obj << "state" << ToString(Info.State); + Obj.BeginObject("process_metrics"); + { + Obj << "MemoryBytes" << Info.Metrics.MemoryBytes; + Obj << "KernelTimeMs" << Info.Metrics.KernelTimeMs; + Obj << "UserTimeMs" << Info.Metrics.UserTimeMs; + Obj << "WorkingSetSize" << Info.Metrics.WorkingSetSize; + Obj << "PeakWorkingSetSize" << Info.Metrics.PeakWorkingSetSize; + Obj << "PagefileUsage" << Info.Metrics.PagefileUsage; + Obj << "PeakPagefileUsage" << Info.Metrics.PeakPagefileUsage; + } + Obj.EndObject(); + } Obj.EndObject(); }); Obj.EndArray(); @@ -176,8 +189,8 @@ HttpHubService::HandleRequest(zen::HttpServerRequest& Request) void HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) { - StorageServerInstance* Instance = nullptr; - if (!m_Hub.Find(ModuleId, &Instance)) + Hub::InstanceInfo InstanceInfo; + if (!m_Hub.Find(ModuleId, &InstanceInfo)) { Request.WriteResponse(HttpResponseCode::NotFound); return; @@ -187,26 +200,36 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod // with a dangling pointer... CbObjectWriter Obj; - Obj << "moduleId" << Instance->GetModuleId(); - Obj << "provisioned" << Instance->IsProvisioned(); + Obj << "moduleId" << ModuleId; + Obj << "state" << ToString(InstanceInfo.State); Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); } void HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) { - StorageServerInstance* Instance = nullptr; - if (!m_Hub.Find(ModuleId, &Instance)) + Hub::InstanceInfo InstanceInfo; + if (!m_Hub.Find(ModuleId, &InstanceInfo)) { Request.WriteResponse(HttpResponseCode::NotFound); return; } - // TODO: deprovision and nuke all related storage + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + std::string Reason; + if (!m_Hub.Deprovision(std::string(ModuleId), Reason)) + { + Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Reason); + return; + } + } + + // TODO: nuke all related storage CbObjectWriter Obj; - Obj << "moduleId" << Instance->GetModuleId(); - Obj << "provisioned" << Instance->IsProvisioned(); + Obj << "moduleId" << ModuleId; + Obj << "state" << ToString(InstanceInfo.State); Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); } diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index c35fa61e8..b0208db1f 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -9,6 +9,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/timer.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -150,6 +151,9 @@ Hub::Hub(const Configuration& Config, ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max()); + m_InstanceLookup.reserve(Config.InstanceLimit); + m_ActiveInstances.reserve(Config.InstanceLimit); + m_FreePorts.resize(Config.InstanceLimit); std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); @@ -167,6 +171,7 @@ Hub::Hub(const Configuration& Config, } } #endif + m_WatchDog = std::thread([this]() { WatchDog(); }); } Hub::~Hub() @@ -175,26 +180,43 @@ Hub::~Hub() { ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + m_WatchDogEvent.Set(); + if (m_WatchDog.joinable()) + { + m_WatchDog.join(); + } + + m_WatchDog = {}; + + // WatchDog has been joined; no concurrent access is possible m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, Instance] : m_Instances) + for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) { - uint16_t BasePort = Instance->GetBasePort(); - std::string BaseUri; // TODO? - - if (m_DeprovisionedModuleCallback) + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); - } - catch (const std::exception& Ex) + StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); + + uint16_t BasePort = InstanceRaw->GetBasePort(); + std::string BaseUri; // TODO? + + if (m_DeprovisionedModuleCallback) { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } } + Instance.Deprovision(); } - Instance->Deprovision(); + InstanceRaw.reset(); } - m_Instances.clear(); + m_InstanceLookup.clear(); + m_ActiveInstances.clear(); + m_FreeActiveInstanceIndexes.clear(); }); } catch (const std::exception& e) @@ -206,20 +228,20 @@ Hub::~Hub() bool Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) { - StorageServerInstance* Instance = nullptr; - bool IsNewInstance = false; + StorageServerInstance::ExclusiveLockedPtr Instance; + bool IsNewInstance = false; + uint16_t AllocatedPort = 0; { RwLock::ExclusiveLockScope _(m_Lock); - uint16_t AllocatedPort = 0; - auto RestoreAllocatedPort = MakeGuard([this, &AllocatedPort]() { - if (AllocatedPort != 0) + auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() { + if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); AllocatedPort = 0; } }); - if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; if (!CanProvisionInstance(ModuleId, /* out */ Reason)) @@ -231,11 +253,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s return false; } + IsNewInstance = true; + AllocatedPort = m_FreePorts.front(); + ZEN_ASSERT(AllocatedPort != 0); m_FreePorts.pop_front(); - IsNewInstance = true; - auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, StorageServerInstance::Configuration{.BasePort = AllocatedPort, @@ -245,63 +268,110 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s .CoreLimit = m_Config.InstanceCoreLimit, .ConfigPath = m_Config.InstanceConfigPath}, ModuleId); + #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { NewInstance->SetJobObject(&m_JobObject); } #endif - Instance = NewInstance.get(); - m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); - AllocatedPort = 0; + + Instance = NewInstance->LockExclusive(/*Wait*/ true); + + size_t ActiveInstanceIndex = (size_t)-1; + if (!m_FreeActiveInstanceIndexes.empty()) + { + ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back(); + m_FreeActiveInstanceIndexes.pop_back(); + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance); + } + else + { + ActiveInstanceIndex = m_ActiveInstances.size(); + m_ActiveInstances.emplace_back(std::move(NewInstance)); + } + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + + const int CurrentInstanceCount = gsl::narrow_cast<int>(m_InstanceLookup.size()); + int CurrentMaxCount = m_MaxInstanceCount.load(); + const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); + + m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } else { - Instance = It->second.get(); + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + AllocatedPort = InstanceRaw->GetBasePort(); } m_ProvisioningModules.emplace(std::string(ModuleId)); } - ZEN_ASSERT(Instance != nullptr); + ZEN_ASSERT(Instance); auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); + if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) + { + m_FreePorts.push_back(AllocatedPort); + AllocatedPort = 0; + } }); - // NOTE: this is done while not holding the lock, as provisioning may take time + // NOTE: this is done while not holding the hub lock, as provisioning may take time // and we don't want to block other operations. We track which modules are being // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision // those modules while in this state. - UpdateStats(); - try { - Instance->Provision(); + Instance.Provision(); + Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + if (IsNewInstance) { - // Clean up - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + // Clean up failed instance provisioning + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + { + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + ZEN_ASSERT(DestroyInstance); + ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + } + } + try + { + DestroyInstance.reset(); + } + catch (const std::exception& Ex) { - ZEN_ASSERT(It->second != nullptr); - uint16_t BasePort = It->second->GetBasePort(); - m_FreePorts.push_back(BasePort); - m_Instances.erase(It); + ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, Ex.what()); } } return false; } - OutInfo.Port = Instance->GetBasePort(); + OutInfo.Port = AllocatedPort; // TODO: base URI? Would need to know what host name / IP to use if (m_ProvisionedModuleCallback) @@ -322,7 +392,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s bool Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) { - std::unique_ptr<StorageServerInstance> Instance; + std::unique_ptr<StorageServerInstance> RawInstance; + StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); @@ -336,22 +407,31 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) return false; } - if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) + if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - // Not found, OutReason should be empty + // Not found, OutReason left empty return false; } else { - Instance = std::move(It->second); - m_Instances.erase(It); + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + ZEN_ASSERT(RawInstance != nullptr); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); m_DeprovisioningModules.emplace(ModuleId); + + Instance = RawInstance->LockExclusive(/*Wait*/ true); } } - uint16_t BasePort = Instance->GetBasePort(); + ZEN_ASSERT(RawInstance); + ZEN_ASSERT(Instance); + + uint16_t BasePort = RawInstance->GetBasePort(); std::string BaseUri; // TODO? if (m_DeprovisionedModuleCallback) @@ -366,57 +446,82 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) } } - // The module is deprovisioned outside the lock to avoid blocking other operations. + // The module is deprovisioned outside the hub lock to avoid blocking other operations. // // To ensure that no new provisioning can occur while we're deprovisioning, // we add the module ID to m_DeprovisioningModules and remove it once // deprovisioning is complete. auto _ = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(ModuleId); - m_FreePorts.push_back(BasePort); + { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(ModuleId); + m_FreePorts.push_back(BasePort); + } }); - Instance->Deprovision(); + Instance.Deprovision(); + + Instance = {}; return true; } bool -Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance) +Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { - if (OutInstance) + if (OutInstanceInfo) { - *OutInstance = It->second.get(); + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(Instance); + InstanceInfo Info{ + Instance->GetState(), + std::chrono::system_clock::now() // TODO + }; + Instance->GetProcessMetrics(Info.Metrics); + + *OutInstanceInfo = Info; } return true; } - else if (OutInstance) - { - *OutInstance = nullptr; - } return false; } void -Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback) +Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback) { - RwLock::SharedLockScope _(m_Lock); - for (auto& It : m_Instances) + std::vector<std::pair<std::string, InstanceInfo>> Infos; + { + RwLock::SharedLockScope _(m_Lock); + for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + { + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(Instance); + InstanceInfo Info{ + Instance->GetState(), + std::chrono::system_clock::now() // TODO + }; + Instance->GetProcessMetrics(Info.Metrics); + + Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info)); + } + } + + for (const std::pair<std::string, InstanceInfo>& Info : Infos) { - Callback(*It.second); + Callback(Info.first, Info.second); } } int Hub::GetInstanceCount() { - RwLock::SharedLockScope _(m_Lock); - return gsl::narrow_cast<int>(m_Instances.size()); + return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); }); } void @@ -424,13 +529,19 @@ Hub::UpdateCapacityMetrics() { m_HostMetrics = GetSystemMetrics(); - // Update per-instance metrics + // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks... + // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules } void Hub::UpdateStats() { - m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); + int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast<int>(m_InstanceLookup.size()); }); + int CurrentMaxCount = m_MaxInstanceCount.load(); + + int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); + + m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } bool @@ -450,19 +561,19 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return false; } - if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit) + if (gsl::narrow_cast<int>(m_InstanceLookup.size()) >= m_Config.InstanceLimit) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); return false; } - // Since deprovisioning happens outside the lock and we don't add the port back until the instance is full shut down we might be under - // the instance limit but all ports may be in use + // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below + // the instance count limit but with no free ports available if (m_FreePorts.empty()) { OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", - m_Config.InstanceLimit - m_Instances.size()); + m_Config.InstanceLimit - m_InstanceLookup.size()); return false; } @@ -472,6 +583,71 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return true; } +void +Hub::WatchDog() +{ + constexpr uint64_t WatchDogWakeupTimeMs = 5000; + constexpr uint64_t WatchDogProcessingTimeMs = 500; + + size_t CheckInstanceIndex = 0; + while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs)) + { + try + { + size_t MaxCheckCount = m_Lock.WithSharedLock([this]() { return m_InstanceLookup.size(); }); + + Stopwatch Timer; + while (MaxCheckCount-- > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !m_WatchDogEvent.Wait(5)) + { + StorageServerInstance::SharedLockedPtr LockedInstance; + m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance]() { + if (m_InstanceLookup.empty()) + { + return; + } + + size_t MaxLoopCount = m_ActiveInstances.size(); + StorageServerInstance* Instance = nullptr; + while (MaxLoopCount-- > 0 && !Instance) + { + CheckInstanceIndex++; + if (CheckInstanceIndex >= m_ActiveInstances.size()) + { + CheckInstanceIndex = 0; + } + Instance = (CheckInstanceIndex < m_ActiveInstances.size()) ? m_ActiveInstances[CheckInstanceIndex].get() : nullptr; + } + + if (Instance) + { + LockedInstance = Instance->LockShared(/*Wait*/ false); + } + }); + + if (LockedInstance) + { + if (LockedInstance.IsRunning()) + { + LockedInstance.UpdateMetrics(); + } + else if (LockedInstance.GetState() == HubInstanceState::Provisioned) + { + // Process is not running but state says it should be — instance died unexpectedly. + // TODO: Track and attempt recovery. + } + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) — expected, skip. + LockedInstance = {}; + } + } + } + catch (const std::exception& Ex) + { + // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc + ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what()); + } + } +} + #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); @@ -507,7 +683,9 @@ TEST_CASE("hub.provision_basic") REQUIRE_MESSAGE(ProvisionResult, Reason); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - CHECK(HubInstance->Find("module_a")); + Hub::InstanceInfo InstanceInfo; + REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); + CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); CHECK(DeprovisionResult); @@ -541,7 +719,9 @@ TEST_CASE("hub.provision_config") REQUIRE_MESSAGE(ProvisionResult, Reason); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - CHECK(HubInstance->Find("module_a")); + Hub::InstanceInfo InstanceInfo; + REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); + CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri)); HttpClient::Response TestResponse = Client.Get("/status/builds"); @@ -660,7 +840,10 @@ TEST_CASE("hub.enumerate_modules") REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); std::vector<std::string> Ids; - HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + Ids.push_back(std::string(ModuleId)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + }); CHECK_EQ(Ids.size(), 2u); const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); @@ -669,7 +852,10 @@ TEST_CASE("hub.enumerate_modules") HubInstance->Deprovision("enum_a", Reason); Ids.clear(); - HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + Ids.push_back(std::string(ModuleId)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + }); REQUIRE_EQ(Ids.size(), 1u); CHECK_EQ(Ids[0], "enum_b"); } @@ -936,6 +1122,59 @@ TEST_CASE("hub.job_object") } # endif // ZEN_PLATFORM_WINDOWS +TEST_CASE("hub.instance_state_basic") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22400; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + HubProvisionedInstanceInfo ProvInfo; + Hub::InstanceInfo Info; + std::string Reason; + + REQUIRE_MESSAGE(HubInstance->Provision("state_a", ProvInfo, Reason), Reason); + + REQUIRE(HubInstance->Find("state_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + + HubInstance->Deprovision("state_a", Reason); + CHECK_FALSE(HubInstance->Find("state_a")); +} + +TEST_CASE("hub.instance_state_enumerate") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22500; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + HubProvisionedInstanceInfo ProvInfo; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision("estate_a", ProvInfo, Reason), Reason); + REQUIRE_MESSAGE(HubInstance->Provision("estate_b", ProvInfo, Reason), Reason); + + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(ProvisionedCount, 2); + + HubInstance->Deprovision("estate_a", Reason); + + ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(ProvisionedCount, 1); +} + TEST_SUITE_END(); void diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 7232b99ee..7094378f6 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -2,6 +2,7 @@ #pragma once +#include "hubinstancestate.h" #include "resourcemetrics.h" #include <zencore/system.h> @@ -11,6 +12,7 @@ #include <filesystem> #include <functional> #include <memory> +#include <thread> #include <unordered_map> #include <unordered_set> @@ -38,15 +40,15 @@ public: /** Enable or disable the use of a Windows Job Object for child process management. * When enabled, all spawned child processes are assigned to a job object with * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub - * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. + * crashes or is force-killed. */ bool UseJobObject = true; uint16_t BasePortNumber = 21000; int InstanceLimit = 1000; - uint32_t InstanceHttpThreadCount = 0; // Deduce from core count - int InstanceCoreLimit = 0; // Use hardware core count + uint32_t InstanceHttpThreadCount = 0; // Automatic + int InstanceCoreLimit = 0; // Automatic std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; }; @@ -62,6 +64,13 @@ public: Hub(const Hub&) = delete; Hub& operator=(const Hub&) = delete; + struct InstanceInfo + { + HubInstanceState State = HubInstanceState::Unprovisioned; + std::chrono::system_clock::time_point ProvisionTime; + ProcessMetrics Metrics; + }; + /** * Provision a storage server instance for the given module ID. * @@ -81,29 +90,24 @@ public: bool Deprovision(const std::string& ModuleId, std::string& OutReason); /** - * Find a storage server instance for the given module ID. - * - * Beware that as this returns a raw pointer to the instance, the caller must ensure - * that the instance is not deprovisioned while in use. + * Find info about storage server instance for the given module ID. * * @param ModuleId The ID of the module to find. - * @param OutInstance If found, the instance will be returned here. + * @param OutInstanceInfo If found, the instance info will be returned here. * @return true if the instance was found, false otherwise. */ - bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr); + bool Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo = nullptr); /** - * Enumerate all storage server instances. + * Enumerate a snapshot of all storage server instances. * - * @param Callback The callback to invoke for each instance. Note that you should - * not do anything heavyweight in the callback as it is invoked while holding - * a shared lock. + * @param Callback The callback to invoke for each instance. */ - void EnumerateModules(std::function<void(StorageServerInstance&)> Callback); + void EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback); int GetInstanceCount(); - int GetMaxInstanceCount() const { return m_MaxInstanceCount; } + int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); } const Configuration& GetConfig() const { return m_Config; } @@ -120,14 +124,20 @@ private: #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif - RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; - std::unordered_set<std::string> m_DeprovisioningModules; - std::unordered_set<std::string> m_ProvisioningModules; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - int m_MaxInstanceCount = 0; - std::deque<uint16_t> m_FreePorts; + RwLock m_Lock; + std::unordered_map<std::string, size_t> m_InstanceLookup; + std::unordered_set<std::string> m_DeprovisioningModules; + std::unordered_set<std::string> m_ProvisioningModules; + std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances; + std::vector<size_t> m_FreeActiveInstanceIndexes; + ResourceMetrics m_ResourceLimits; + SystemMetrics m_HostMetrics; + std::atomic<int> m_MaxInstanceCount = 0; + std::deque<uint16_t> m_FreePorts; + std::thread m_WatchDog; + + Event m_WatchDogEvent; + void WatchDog(); void UpdateStats(); void UpdateCapacityMetrics(); diff --git a/src/zenserver/hub/hubinstancestate.cpp b/src/zenserver/hub/hubinstancestate.cpp new file mode 100644 index 000000000..8337f73a8 --- /dev/null +++ b/src/zenserver/hub/hubinstancestate.cpp @@ -0,0 +1,33 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hubinstancestate.h" + +#include <zencore/assertfmt.h> + +namespace zen { + +std::string_view +ToString(HubInstanceState State) +{ + switch (State) + { + case HubInstanceState::Unprovisioned: + return "unprovisioned"; + case HubInstanceState::Provisioning: + return "provisioning"; + case HubInstanceState::Provisioned: + return "provisioned"; + case HubInstanceState::Hibernating: + return "hibernating"; + case HubInstanceState::Hibernated: + return "hibernated"; + case HubInstanceState::Waking: + return "waking"; + case HubInstanceState::Deprovisioning: + return "deprovisioning"; + } + ZEN_ASSERT(false); + return "unknown"; +} + +} // namespace zen diff --git a/src/zenserver/hub/hubinstancestate.h b/src/zenserver/hub/hubinstancestate.h new file mode 100644 index 000000000..ce1f222bd --- /dev/null +++ b/src/zenserver/hub/hubinstancestate.h @@ -0,0 +1,23 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> +#include <string_view> + +namespace zen { + +enum class HubInstanceState : uint32_t +{ + Unprovisioned, // Initial state; process not running + Provisioning, // Hydrating and spawning process + Provisioned, // Process running and serving requests + Hibernating, // Shutting down process, preserving data on disk + Hibernated, // Process stopped, data preserved; can be woken + Waking, // Starting process from preserved data + Deprovisioning, // Shutting down process and cleaning up data +}; + +std::string_view ToString(HubInstanceState State); + +} // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index c54322683..bab501429 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -57,72 +57,82 @@ StorageServerInstance::SpawnServerProcess() } void -StorageServerInstance::Provision() +StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const { - RwLock::ExclusiveLockScope _(m_Lock); + OutMetrics.MemoryBytes = m_MemoryBytes.load(); + OutMetrics.KernelTimeMs = m_KernelTimeMs.load(); + OutMetrics.UserTimeMs = m_UserTimeMs.load(); + OutMetrics.WorkingSetSize = m_WorkingSetSize.load(); + OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load(); + OutMetrics.PagefileUsage = m_PagefileUsage.load(); + OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); +} - if (m_IsProvisioned) +void +StorageServerInstance::ProvisionLocked() +{ + if (m_State.load() == HubInstanceState::Provisioned) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); - return; } - if (m_IsHibernated) + if (m_State.load() == HubInstanceState::Hibernated) { - WakeLocked(); + if (WakeLocked()) + { + return; + } + // Wake failed; proceed with a fresh provision (discards hibernated data) + m_State = HubInstanceState::Unprovisioned; } - else + else if (m_State.load() != HubInstanceState::Unprovisioned) { - ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); - - Hydrate(); - - SpawnServerProcess(); + ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision", + m_ModuleId, + ToString(m_State.load())); + return; } - m_IsProvisioned = true; + ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); + + m_State = HubInstanceState::Provisioning; + Hydrate(); + SpawnServerProcess(); + m_State = HubInstanceState::Provisioned; } void -StorageServerInstance::Deprovision() +StorageServerInstance::DeprovisionLocked() { - RwLock::ExclusiveLockScope _(m_Lock); - - if (!m_IsProvisioned) + if (m_State.load() != HubInstanceState::Provisioned) { - ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); - + ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')", + m_ModuleId, + ToString(m_State.load())); return; } ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); + m_State = HubInstanceState::Deprovisioning; m_ServerInstance.Shutdown(); Dehydrate(); - m_IsProvisioned = false; + m_State = HubInstanceState::Unprovisioned; } void -StorageServerInstance::Hibernate() +StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake - RwLock::ExclusiveLockScope _(m_Lock); - - if (!m_IsProvisioned) - { - ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); - - return; - } - - if (m_IsHibernated) + if (m_State.load() != HubInstanceState::Provisioned) { - ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); - + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned (state: '{}')", + m_ModuleId, + ToString(m_State.load())); return; } @@ -135,52 +145,46 @@ StorageServerInstance::Hibernate() return; } + m_State = HubInstanceState::Hibernating; try { m_ServerInstance.Shutdown(); - - m_IsHibernated = true; - m_IsProvisioned = false; - + m_State = HubInstanceState::Hibernated; return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running } } -void -StorageServerInstance::Wake() -{ - RwLock::ExclusiveLockScope _(m_Lock); - WakeLocked(); -} - -void +bool StorageServerInstance::WakeLocked() { // Start server in-place using existing data - if (!m_IsHibernated) + if (m_State.load() != HubInstanceState::Hibernated) { ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); - return; + return true; // Instance is already usable (noop success) } ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + m_State = HubInstanceState::Waking; try { SpawnServerProcess(); - m_IsHibernated = false; + m_State = HubInstanceState::Provisioned; + return true; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); - - // TODO: this instance should be marked as invalid + m_State = HubInstanceState::Hibernated; + return false; } } @@ -210,4 +214,193 @@ StorageServerInstance::Dehydrate() Hydrator->Dehydrate(); } +StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) +{ +} + +StorageServerInstance::SharedLockedPtr::SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) +: m_Lock(nullptr) +, m_Instance(nullptr) +{ + ZEN_ASSERT(Instance != nullptr); + if (Wait) + { + Lock.AcquireShared(); + m_Lock = &Lock; + m_Instance = Instance; + } + else + { + if (Lock.TryAcquireShared()) + { + m_Lock = &Lock; + m_Instance = Instance; + } + } +} + +StorageServerInstance::SharedLockedPtr::SharedLockedPtr(SharedLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) +{ + Rhs.m_Lock = nullptr; + Rhs.m_Instance = nullptr; +} + +StorageServerInstance::SharedLockedPtr::~SharedLockedPtr() +{ + if (m_Lock != nullptr) + { + m_Lock->ReleaseShared(); + m_Lock = nullptr; + } + m_Instance = nullptr; +} + +StorageServerInstance::SharedLockedPtr& +StorageServerInstance::SharedLockedPtr::operator=(SharedLockedPtr&& Rhs) +{ + if (m_Lock) + { + m_Lock->ReleaseShared(); + m_Lock = nullptr; + m_Instance = nullptr; + } + m_Lock = Rhs.m_Lock; + m_Instance = Rhs.m_Instance; + Rhs.m_Lock = nullptr; + Rhs.m_Instance = nullptr; + return *this; +} + +std::string_view +StorageServerInstance::SharedLockedPtr::GetModuleId() const +{ + ZEN_ASSERT(m_Instance != nullptr); + return m_Instance->m_ModuleId; +} + +bool +StorageServerInstance::SharedLockedPtr::IsRunning() const +{ + ZEN_ASSERT(m_Instance != nullptr); + return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); +} + +void +StorageServerInstance::UpdateMetricsLocked() +{ + if (m_State.load() == HubInstanceState::Provisioned) + { + ProcessMetrics Metrics; + zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); + + m_MemoryBytes.store(Metrics.MemoryBytes); + m_KernelTimeMs.store(Metrics.KernelTimeMs); + m_UserTimeMs.store(Metrics.UserTimeMs); + m_WorkingSetSize.store(Metrics.WorkingSetSize); + m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); + m_PagefileUsage.store(Metrics.PagefileUsage); + m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage); + } + // TODO: Resource metrics... +} + +StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) +{ +} + +StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) +: m_Lock(nullptr) +, m_Instance(nullptr) +{ + ZEN_ASSERT(Instance != nullptr); + if (Wait) + { + Lock.AcquireExclusive(); + m_Lock = &Lock; + m_Instance = Instance; + } + else + { + if (Lock.TryAcquireExclusive()) + { + m_Lock = &Lock; + m_Instance = Instance; + } + } +} + +StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) +{ + Rhs.m_Lock = nullptr; + Rhs.m_Instance = nullptr; +} + +StorageServerInstance::ExclusiveLockedPtr::~ExclusiveLockedPtr() +{ + if (m_Lock != nullptr) + { + m_Lock->ReleaseExclusive(); + m_Lock = nullptr; + } + m_Instance = nullptr; +} + +StorageServerInstance::ExclusiveLockedPtr& +StorageServerInstance::ExclusiveLockedPtr::operator=(ExclusiveLockedPtr&& Rhs) +{ + if (m_Lock) + { + m_Lock->ReleaseExclusive(); + m_Lock = nullptr; + m_Instance = nullptr; + } + m_Lock = Rhs.m_Lock; + m_Instance = Rhs.m_Instance; + Rhs.m_Lock = nullptr; + Rhs.m_Instance = nullptr; + return *this; +} + +std::string_view +StorageServerInstance::ExclusiveLockedPtr::GetModuleId() const +{ + ZEN_ASSERT(m_Instance != nullptr); + return m_Instance->m_ModuleId; +} + +bool +StorageServerInstance::ExclusiveLockedPtr::IsRunning() const +{ + ZEN_ASSERT(m_Instance != nullptr); + return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); +} + +void +StorageServerInstance::ExclusiveLockedPtr::Provision() +{ + ZEN_ASSERT(m_Instance != nullptr); + m_Instance->ProvisionLocked(); +} + +void +StorageServerInstance::ExclusiveLockedPtr::Deprovision() +{ + ZEN_ASSERT(m_Instance != nullptr); + m_Instance->DeprovisionLocked(); +} + +void +StorageServerInstance::ExclusiveLockedPtr::Hibernate() +{ + ZEN_ASSERT(m_Instance != nullptr); + m_Instance->HibernateLocked(); +} + +bool +StorageServerInstance::ExclusiveLockedPtr::Wake() +{ + ZEN_ASSERT(m_Instance != nullptr); + return m_Instance->WakeLocked(); +} + } // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 9b8aa7ac9..bf6fff8a0 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -2,6 +2,7 @@ #pragma once +#include "hubinstancestate.h" #include "resourcemetrics.h" #include <zenutil/zenserverprocess.h> @@ -26,44 +27,137 @@ public: uint16_t BasePort; std::filesystem::path HydrationTempPath; std::string HydrationTargetSpecification; - uint32_t HttpThreadCount = 0; // Deduce from core count - int CoreLimit = 0; // Use hardware core count + uint32_t HttpThreadCount = 0; // Automatic + int CoreLimit = 0; // Automatic std::filesystem::path ConfigPath; }; StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId); ~StorageServerInstance(); - void Provision(); - void Deprovision(); - - void Hibernate(); - void Wake(); - const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } inline std::string_view GetModuleId() const { return m_ModuleId; } - inline bool IsProvisioned() const { return m_IsProvisioned.load(); } - - inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } + inline HubInstanceState GetState() const { return m_State.load(); } + inline uint16_t GetBasePort() const { return m_Config.BasePort; }; + void GetProcessMetrics(ProcessMetrics& OutMetrics) const; #if ZEN_PLATFORM_WINDOWS void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; } #endif + class SharedLockedPtr + { + public: + SharedLockedPtr(const SharedLockedPtr&) = delete; + SharedLockedPtr(); + + SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait); + + SharedLockedPtr(SharedLockedPtr&& Rhs); + ~SharedLockedPtr(); + + SharedLockedPtr& operator=(const SharedLockedPtr&) = delete; + SharedLockedPtr& operator =(SharedLockedPtr&& Rhs); + + operator bool() const { return m_Instance != nullptr; } + + std::string_view GetModuleId() const; + HubInstanceState GetState() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->m_State.load(); + } + bool IsRunning() const; + + const ResourceMetrics& GetResourceMetrics() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->m_ResourceMetrics; + } + void UpdateMetrics() + { + ZEN_ASSERT(m_Instance); + return m_Instance->UpdateMetricsLocked(); + } + + private: + RwLock* m_Lock = nullptr; + StorageServerInstance* m_Instance = nullptr; + }; + + [[nodiscard]] SharedLockedPtr LockShared(bool Wait) { return SharedLockedPtr(m_Lock, this, Wait); } + + class ExclusiveLockedPtr + { + public: + ExclusiveLockedPtr(const ExclusiveLockedPtr&) = delete; + ExclusiveLockedPtr(); + + ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait); + + ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs); + ~ExclusiveLockedPtr(); + + ExclusiveLockedPtr& operator=(const ExclusiveLockedPtr&) = delete; + ExclusiveLockedPtr& operator =(ExclusiveLockedPtr&& Rhs); + + operator bool() const { return m_Instance != nullptr; } + + std::string_view GetModuleId() const; + HubInstanceState GetState() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->m_State.load(); + } + bool IsRunning() const; + + const ResourceMetrics& GetResourceMetrics() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->m_ResourceMetrics; + } + + void Provision(); + void Deprovision(); + void Hibernate(); + bool Wake(); + + private: + RwLock* m_Lock = nullptr; + StorageServerInstance* m_Instance = nullptr; + }; + + [[nodiscard]] ExclusiveLockedPtr LockExclusive(bool Wait) { return ExclusiveLockedPtr(m_Lock, this, Wait); } + private: - void WakeLocked(); - RwLock m_Lock; + void ProvisionLocked(); + void DeprovisionLocked(); + + void HibernateLocked(); + [[nodiscard]] bool WakeLocked(); + + void UpdateMetricsLocked(); + + mutable RwLock m_Lock; const Configuration m_Config; std::string m_ModuleId; ZenServerInstance m_ServerInstance; - std::atomic<bool> m_IsProvisioned{false}; - std::atomic<bool> m_IsHibernated{false}; - std::filesystem::path m_BaseDir; + std::atomic<HubInstanceState> m_State{HubInstanceState::Unprovisioned}; + std::filesystem::path m_BaseDir; std::filesystem::path m_TempDir; ResourceMetrics m_ResourceMetrics; + + std::atomic<uint64_t> m_MemoryBytes = 0; + std::atomic<uint64_t> m_KernelTimeMs = 0; + std::atomic<uint64_t> m_UserTimeMs = 0; + std::atomic<uint64_t> m_WorkingSetSize = 0; + std::atomic<uint64_t> m_PeakWorkingSetSize = 0; + std::atomic<uint64_t> m_PagefileUsage = 0; + std::atomic<uint64_t> m_PeakPagefileUsage = 0; + #if ZEN_PLATFORM_WINDOWS JobObject* m_JobObject = nullptr; #endif @@ -72,6 +166,9 @@ private: void Hydrate(); void Dehydrate(); + + friend class SharedLockedPtr; + friend class ExclusiveLockedPtr; }; } // namespace zen diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 33d813122..d44a13d86 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -29,8 +29,8 @@ struct ZenHubServerConfig : public ZenServerConfig int HubInstanceLimit = 1000; bool HubUseJobObject = true; std::string HubInstanceHttpClass = "asio"; - uint32_t HubInstanceHttpThreadCount = 0; // Deduce from core count - int HubInstanceCoreLimit = 0; // Use hardware core count + uint32_t HubInstanceHttpThreadCount = 0; // Automatic + int HubInstanceCoreLimit = 0; // Automatic std::filesystem::path HubInstanceConfigPath; // Path to Lua config file std::string HydrationTargetSpecification; // hydration/dehydration target specification }; diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 5528b766c..308ae0ef2 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -115,22 +115,23 @@ struct ZenServerInstance ZenServerInstance(ZenServerEnvironment& TestEnvironment, ServerMode Mode = ServerMode::kStorageServer); ~ZenServerInstance(); - int Shutdown(); - bool SignalShutdown(std::error_code& OutEc); - uint16_t WaitUntilReady(); - [[nodiscard]] bool WaitUntilReady(int Timeout); - [[nodiscard]] bool WaitUntilExited(int Timeout, std::error_code& OutEc); - void EnableTermination() { m_Terminate = true; } - void EnableShutdownOnDestroy() { m_ShutdownOnDestroy = true; } - void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; } - void Detach(); - inline int GetPid() const { return m_Process.Pid(); } - inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } - void* GetProcessHandle() const { return m_Process.Handle(); } + int Shutdown(); + bool SignalShutdown(std::error_code& OutEc); + uint16_t WaitUntilReady(); + [[nodiscard]] bool WaitUntilReady(int Timeout); + [[nodiscard]] bool WaitUntilExited(int Timeout, std::error_code& OutEc); + void EnableTermination() { m_Terminate = true; } + void EnableShutdownOnDestroy() { m_ShutdownOnDestroy = true; } + void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; } + void Detach(); + inline int GetPid() const { return m_Process.Pid(); } + inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } + const ProcessHandle& GetProcessHandle() const { return m_Process; } + #if ZEN_PLATFORM_WINDOWS void SetJobObject(JobObject* Job) { m_JobObject = Job; } #endif - bool IsRunning(); + bool IsRunning() const; bool Terminate(); std::string GetLogOutput() const; diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index ebce9730e..8eaf2cf5b 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -1419,7 +1419,7 @@ ZenServerInstance::SetDataDir(std::filesystem::path TestDir) } bool -ZenServerInstance::IsRunning() +ZenServerInstance::IsRunning() const { if (!m_Process.IsValid()) { |