diff options
Diffstat (limited to 'src/zenserver/hub')
| -rw-r--r-- | src/zenserver/hub/README.md | 17 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 214 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.h | 17 | ||||
| -rw-r--r-- | src/zenserver/hub/httpproxyhandler.cpp | 528 | ||||
| -rw-r--r-- | src/zenserver/hub/httpproxyhandler.h | 52 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 1019 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 86 | ||||
| -rw-r--r-- | src/zenserver/hub/hubinstancestate.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/hubinstancestate.h | 3 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 2201 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 71 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 145 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 62 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 477 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 50 |
15 files changed, 3689 insertions, 1255 deletions
diff --git a/src/zenserver/hub/README.md b/src/zenserver/hub/README.md index 322be3649..c75349fa5 100644 --- a/src/zenserver/hub/README.md +++ b/src/zenserver/hub/README.md @@ -3,23 +3,32 @@ The Zen Server can act in a "hub" mode. In this mode, the only services offered are the basic health and diagnostic services alongside an API to provision and deprovision Storage server instances. +A module ID is an alphanumeric identifier (hyphens allowed) that identifies a dataset, typically +associated with a content plug-in module. + ## Generic Server API GET `/health` - returns an `OK!` payload when all enabled services are up and responding ## Hub API -GET `{moduleid}` - alphanumeric identifier to identify a dataset (typically associated with a content plug-in module) - -GET `/hub/status` - obtain a summary of the currently live instances +GET `/hub/status` - obtain a summary of all currently live instances GET `/hub/modules/{moduleid}` - retrieve information about a module +DELETE `/hub/modules/{moduleid}` - obliterate a module (permanently destroys all data) + POST `/hub/modules/{moduleid}/provision` - provision service for module POST `/hub/modules/{moduleid}/deprovision` - deprovision service for module -GET `/hub/stats` - retrieve stats for service +POST `/hub/modules/{moduleid}/hibernate` - hibernate a provisioned module + +POST `/hub/modules/{moduleid}/wake` - wake a hibernated module + +GET `/stats/hub` - retrieve stats for the hub service + +`/hub/proxy/{port}/{path}` - reverse proxy to a child instance dashboard (all HTTP verbs) ## Hub Configuration diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index ebefcf2e3..e4b0c28d0 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -2,6 +2,7 @@ #include "httphubservice.h" +#include "httpproxyhandler.h" #include "hub.h" #include "storageserverinstance.h" @@ -43,10 +44,11 @@ namespace { } } // namespace -HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService) +HttpHubService::HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsService& StatsService, HttpStatusService& StatusService) : m_Hub(Hub) , m_StatsService(StatsService) , m_StatusService(StatusService) +, m_Proxy(Proxy) { using namespace std::literals; @@ -67,6 +69,23 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta return true; }); + m_Router.AddMatcher("port", [](std::string_view Str) -> bool { + if (Str.empty()) + { + return false; + } + for (const auto C : Str) + { + if (!std::isdigit(C)) + { + return false; + } + } + return true; + }); + + m_Router.AddMatcher("proxypath", [](std::string_view Str) -> bool { return !Str.empty(); }); + m_Router.RegisterRoute( "status", [this](HttpRouterRequest& Req) { @@ -78,6 +97,10 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta Obj << "moduleId" << ModuleId; Obj << "state" << ToString(Info.State); Obj << "port" << Info.Port; + if (Info.StateChangeTime != std::chrono::system_clock::time_point::min()) + { + Obj << "state_change_time" << ToDateTime(Info.StateChangeTime); + } Obj.BeginObject("process_metrics"); { Obj << "MemoryBytes" << Info.Metrics.MemoryBytes; @@ -98,6 +121,11 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta HttpVerb::kGet); m_Router.RegisterRoute( + "deprovision", + [this](HttpRouterRequest& Req) { HandleDeprovisionAll(Req.ServerRequest()); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "modules/{moduleid}", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); @@ -229,15 +257,23 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta HttpVerb::kPost); m_Router.RegisterRoute( - "stats", + "proxy/{port}/{proxypath}", [this](HttpRouterRequest& Req) { - CbObjectWriter Obj; - Obj << "currentInstanceCount" << m_Hub.GetInstanceCount(); - Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount(); - Obj << "instanceLimit" << m_Hub.GetConfig().InstanceLimit; - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + std::string_view PortStr = Req.GetCapture(1); + + // Use RelativeUriWithExtension to preserve the file extension that the + // router's URI parser strips (e.g. ".css", ".js") - the upstream server + // needs the full path including the extension. + std::string_view FullUri = Req.ServerRequest().RelativeUriWithExtension(); + std::string_view Prefix = "proxy/"; + + // FullUri is "proxy/{port}/{path...}" - skip past "proxy/{port}/" + size_t PathStart = Prefix.size() + PortStr.size() + 1; + std::string_view PathTail = (PathStart < FullUri.size()) ? FullUri.substr(PathStart) : std::string_view{}; + + m_Proxy.HandleProxyRequest(Req.ServerRequest(), PortStr, PathTail); }, - HttpVerb::kGet); + HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete | HttpVerb::kHead); m_StatsService.RegisterHandler("hub", *this); m_StatusService.RegisterHandler("hub", *this); @@ -286,7 +322,37 @@ HttpHubService::HandleStatusRequest(HttpServerRequest& Request) void HttpHubService::HandleStatsRequest(HttpServerRequest& Request) { - Request.WriteResponse(HttpResponseCode::OK, CollectStats()); + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo << "currentInstanceCount" << m_Hub.GetInstanceCount(); + Cbo << "maxInstanceCount" << m_Hub.GetMaxInstanceCount(); + Cbo << "instanceLimit" << m_Hub.GetConfig().InstanceLimit; + + SystemMetrics SysMetrics; + DiskSpace Disk; + m_Hub.GetMachineMetrics(SysMetrics, Disk); + Cbo.BeginObject("machine"); + { + Cbo << "disk_free_bytes" << Disk.Free; + Cbo << "disk_total_bytes" << Disk.Total; + Cbo << "memory_avail_mib" << SysMetrics.AvailSystemMemoryMiB; + Cbo << "memory_total_mib" << SysMetrics.SystemMemoryMiB; + Cbo << "virtual_memory_avail_mib" << SysMetrics.AvailVirtualMemoryMiB; + Cbo << "virtual_memory_total_mib" << SysMetrics.VirtualMemoryMiB; + } + Cbo.EndObject(); + + const ResourceMetrics& Limits = m_Hub.GetConfig().ResourceLimits; + Cbo.BeginObject("resource_limits"); + { + Cbo << "disk_bytes" << Limits.DiskUsageBytes; + Cbo << "memory_bytes" << Limits.MemoryUsageBytes; + } + Cbo.EndObject(); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } CbObject @@ -310,6 +376,81 @@ HttpHubService::GetActivityCounter() } void +HttpHubService::HandleDeprovisionAll(HttpServerRequest& Request) +{ + std::vector<std::string> ModulesToDeprovision; + m_Hub.EnumerateModules([&ModulesToDeprovision](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated) + { + ModulesToDeprovision.push_back(std::string(ModuleId)); + } + }); + + if (ModulesToDeprovision.empty()) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + std::vector<std::string> Rejected; + std::vector<std::string> Accepted; + std::vector<std::string> Completed; + for (const std::string& ModuleId : ModulesToDeprovision) + { + Hub::Response Response = m_Hub.Deprovision(ModuleId); + switch (Response.ResponseCode) + { + case Hub::EResponseCode::NotFound: + // Ignore + break; + case Hub::EResponseCode::Rejected: + Rejected.push_back(ModuleId); + break; + case Hub::EResponseCode::Accepted: + Accepted.push_back(ModuleId); + break; + case Hub::EResponseCode::Completed: + Completed.push_back(ModuleId); + break; + } + } + if (Rejected.empty() && Accepted.empty() && Completed.empty()) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + HttpResponseCode Response = HttpResponseCode::OK; + CbObjectWriter Writer; + if (!Completed.empty()) + { + Writer.BeginArray("Completed"); + for (const std::string& ModuleId : Completed) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Completed + } + if (!Accepted.empty()) + { + Writer.BeginArray("Accepted"); + for (const std::string& ModuleId : Accepted) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Accepted + Response = HttpResponseCode::Accepted; + } + if (!Rejected.empty()) + { + Writer.BeginArray("Rejected"); + for (const std::string& ModuleId : Rejected) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Rejected + Response = HttpResponseCode::Conflict; + } + Request.WriteResponse(Response, Writer.Save()); +} + +void HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) { Hub::InstanceInfo InstanceInfo; @@ -328,45 +469,36 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod void HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) { - Hub::InstanceInfo InstanceInfo; - if (!m_Hub.Find(ModuleId, &InstanceInfo)) + Hub::Response Resp = m_Hub.Obliterate(std::string(ModuleId)); + + if (HandleFailureResults(Request, Resp)) { - Request.WriteResponse(HttpResponseCode::NotFound); return; } - if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated || - InstanceInfo.State == HubInstanceState::Crashed) - { - try - { - Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId)); - - if (HandleFailureResults(Request, Resp)) - { - return; - } - - // TODO: nuke all related storage + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + Request.WriteResponse(HttpCode, Obj.Save()); +} - const HttpResponseCode HttpCode = - (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; - CbObjectWriter Obj; - Obj << "moduleId" << ModuleId; - return Request.WriteResponse(HttpCode, Obj.Save()); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); - throw; - } - } +void +HttpHubService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) +{ + m_Proxy.OnWebSocketOpen(std::move(Connection), RelativeUri); +} - // TODO: nuke all related storage +void +HttpHubService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) +{ + m_Proxy.OnWebSocketMessage(Conn, Msg); +} - CbObjectWriter Obj; - Obj << "moduleId" << ModuleId; - Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); +void +HttpHubService::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) +{ + m_Proxy.OnWebSocketClose(Conn, Code, Reason); } } // namespace zen diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h index 1bb1c303e..f4d1b0b89 100644 --- a/src/zenserver/hub/httphubservice.h +++ b/src/zenserver/hub/httphubservice.h @@ -2,11 +2,16 @@ #pragma once +#include <zencore/thread.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstatus.h> +#include <zenhttp/websocket.h> + +#include <memory> namespace zen { +class HttpProxyHandler; class HttpStatsService; class Hub; @@ -16,10 +21,10 @@ class Hub; * use in UEFN content worker style scenarios. * */ -class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider +class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler { public: - HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService); + HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsService& StatsService, HttpStatusService& StatusService); ~HttpHubService(); HttpHubService(const HttpHubService&) = delete; @@ -32,6 +37,11 @@ public: virtual CbObject CollectStats() override; virtual uint64_t GetActivityCounter() override; + // IWebSocketHandler + void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) override; + void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override; + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override; + void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId); private: @@ -43,8 +53,11 @@ private: HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; + void HandleDeprovisionAll(HttpServerRequest& Request); void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); + + HttpProxyHandler& m_Proxy; }; } // namespace zen diff --git a/src/zenserver/hub/httpproxyhandler.cpp b/src/zenserver/hub/httpproxyhandler.cpp new file mode 100644 index 000000000..235d7388f --- /dev/null +++ b/src/zenserver/hub/httpproxyhandler.cpp @@ -0,0 +1,528 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpproxyhandler.h" + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpwsclient.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <charconv> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +namespace { + + std::string InjectProxyScript(std::string_view Html, uint16_t Port) + { + ExtendableStringBuilder<2048> Script; + Script.Append("<script>\n(function(){\n var P = \"/hub/proxy/"); + Script.Append(fmt::format("{}", Port)); + Script.Append( + "\";\n" + " var OF = window.fetch;\n" + " window.fetch = function(u, o) {\n" + " if (typeof u === \"string\") {\n" + " try {\n" + " var p = new URL(u, location.origin);\n" + " if (p.origin === location.origin && !p.pathname.startsWith(P))\n" + " { p.pathname = P + p.pathname; u = p.toString(); }\n" + " } catch(e) {\n" + " if (u.startsWith(\"/\") && !u.startsWith(P)) u = P + u;\n" + " }\n" + " }\n" + " return OF.call(this, u, o);\n" + " };\n" + " var OW = window.WebSocket;\n" + " window.WebSocket = function(u, pr) {\n" + " try {\n" + " var p = new URL(u);\n" + " if (p.hostname === location.hostname\n" + " && String(p.port || (p.protocol === \"wss:\" ? \"443\" : \"80\"))\n" + " === String(location.port || (location.protocol === \"https:\" ? \"443\" : \"80\"))\n" + " && !p.pathname.startsWith(P))\n" + " { p.pathname = P + p.pathname; u = p.toString(); }\n" + " } catch(e) {}\n" + " return pr !== undefined ? new OW(u, pr) : new OW(u);\n" + " };\n" + " window.WebSocket.prototype = OW.prototype;\n" + " window.WebSocket.CONNECTING = OW.CONNECTING;\n" + " window.WebSocket.OPEN = OW.OPEN;\n" + " window.WebSocket.CLOSING = OW.CLOSING;\n" + " window.WebSocket.CLOSED = OW.CLOSED;\n" + " var OO = window.open;\n" + " window.open = function(u, t, f) {\n" + " if (typeof u === \"string\") {\n" + " try {\n" + " var p = new URL(u, location.origin);\n" + " if (p.origin === location.origin && !p.pathname.startsWith(P))\n" + " { p.pathname = P + p.pathname; u = p.toString(); }\n" + " } catch(e) {}\n" + " }\n" + " return OO.call(this, u, t, f);\n" + " };\n" + " document.addEventListener(\"click\", function(e) {\n" + " var t = e.composedPath ? e.composedPath()[0] : e.target;\n" + " while (t && t.tagName !== \"A\") t = t.parentNode || t.host;\n" + " if (!t || !t.href) return;\n" + " try {\n" + " var h = new URL(t.href);\n" + " if (h.origin === location.origin && !h.pathname.startsWith(P))\n" + " { h.pathname = P + h.pathname; e.preventDefault(); window.location.href = h.toString(); }\n" + " } catch(x) {}\n" + " }, true);\n" + "})();\n</script>"); + + std::string ScriptStr = Script.ToString(); + + size_t HeadClose = Html.find("</head>"); + if (HeadClose != std::string_view::npos) + { + std::string Result; + Result.reserve(Html.size() + ScriptStr.size()); + Result.append(Html.substr(0, HeadClose)); + Result.append(ScriptStr); + Result.append(Html.substr(HeadClose)); + return Result; + } + + std::string Result; + Result.reserve(Html.size() + ScriptStr.size()); + Result.append(ScriptStr); + Result.append(Html); + return Result; + } + +} // namespace + +struct HttpProxyHandler::WsBridge : public RefCounted, public IWsClientHandler +{ + Ref<WebSocketConnection> ClientConn; + std::unique_ptr<HttpWsClient> UpstreamClient; + uint16_t Port = 0; + + void OnWsOpen() override {} + + void OnWsMessage(const WebSocketMessage& Msg) override + { + if (!ClientConn->IsOpen()) + { + return; + } + switch (Msg.Opcode) + { + case WebSocketOpcode::kText: + ClientConn->SendText(std::string_view(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize())); + break; + case WebSocketOpcode::kBinary: + ClientConn->SendBinary(std::span<const uint8_t>(static_cast<const uint8_t*>(Msg.Payload.GetData()), Msg.Payload.GetSize())); + break; + default: + break; + } + } + + void OnWsClose(uint16_t Code, std::string_view Reason) override + { + if (ClientConn->IsOpen()) + { + ClientConn->Close(Code, Reason); + } + } +}; + +HttpProxyHandler::HttpProxyHandler() +{ +} + +HttpProxyHandler::HttpProxyHandler(PortValidator ValidatePort) : m_ValidatePort(std::move(ValidatePort)) +{ +} + +void +HttpProxyHandler::SetPortValidator(PortValidator ValidatePort) +{ + m_ValidatePort = std::move(ValidatePort); +} + +HttpProxyHandler::~HttpProxyHandler() +{ + try + { + Shutdown(); + } + catch (...) + { + } +} + +HttpClient& +HttpProxyHandler::GetOrCreateProxyClient(uint16_t Port) +{ + HttpClient* Result = nullptr; + m_ProxyClientsLock.WithExclusiveLock([&] { + auto It = m_ProxyClients.find(Port); + if (It == m_ProxyClients.end()) + { + HttpClientSettings Settings; + Settings.LogCategory = "hub-proxy"; + Settings.ConnectTimeout = std::chrono::milliseconds(5000); + Settings.Timeout = std::chrono::milliseconds(30000); + auto Client = std::make_unique<HttpClient>(fmt::format("http://127.0.0.1:{}", Port), Settings); + Result = Client.get(); + m_ProxyClients.emplace(Port, std::move(Client)); + } + else + { + Result = It->second.get(); + } + }); + return *Result; +} + +void +HttpProxyHandler::HandleProxyRequest(HttpServerRequest& Request, std::string_view PortStr, std::string_view PathTail) +{ + uint16_t Port = 0; + auto [Ptr, Ec] = std::from_chars(PortStr.data(), PortStr.data() + PortStr.size(), Port); + if (Ec != std::errc{} || Ptr != PortStr.data() + PortStr.size()) + { + Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "invalid proxy URL"); + return; + } + + if (!m_ValidatePort(Port)) + { + Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "target instance not available"); + return; + } + + HttpClient& Client = GetOrCreateProxyClient(Port); + + std::string RequestPath; + RequestPath.reserve(1 + PathTail.size()); + RequestPath.push_back('/'); + RequestPath.append(PathTail); + + std::string_view QueryString = Request.QueryString(); + if (!QueryString.empty()) + { + RequestPath.push_back('?'); + RequestPath.append(QueryString); + } + + HttpClient::KeyValueMap ForwardHeaders; + HttpContentType AcceptType = Request.AcceptContentType(); + if (AcceptType != HttpContentType::kUnknownContentType) + { + ForwardHeaders->emplace("Accept", std::string(MapContentTypeToString(AcceptType))); + } + + std::string_view Auth = Request.GetAuthorizationHeader(); + if (!Auth.empty()) + { + ForwardHeaders->emplace("Authorization", std::string(Auth)); + } + + HttpContentType ReqContentType = Request.RequestContentType(); + if (ReqContentType != HttpContentType::kUnknownContentType) + { + ForwardHeaders->emplace("Content-Type", std::string(MapContentTypeToString(ReqContentType))); + } + + HttpClient::Response Response; + + switch (Request.RequestVerb()) + { + case HttpVerb::kGet: + Response = Client.Get(RequestPath, ForwardHeaders); + break; + case HttpVerb::kPost: + { + IoBuffer Payload = Request.ReadPayload(); + Response = Client.Post(RequestPath, Payload, ForwardHeaders); + break; + } + case HttpVerb::kPut: + { + IoBuffer Payload = Request.ReadPayload(); + Response = Client.Put(RequestPath, Payload, ForwardHeaders); + break; + } + case HttpVerb::kDelete: + Response = Client.Delete(RequestPath, ForwardHeaders); + break; + case HttpVerb::kHead: + Response = Client.Head(RequestPath, ForwardHeaders); + break; + default: + Request.WriteResponse(HttpResponseCode::MethodNotAllowed, HttpContentType::kText, "method not supported"); + return; + } + + if (Response.Error) + { + if (!m_ValidatePort(Port)) + { + Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "target instance not available"); + return; + } + + ZEN_WARN("proxy request to port {} failed: {}", Port, Response.Error->ErrorMessage); + switch (Response.Error->ErrorCode) + { + case HttpClientErrorCode::kConnectionFailure: + case HttpClientErrorCode::kHostResolutionFailure: + return Request.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("instance not reachable: {}", Response.Error->ErrorMessage)); + case HttpClientErrorCode::kOperationTimedOut: + return Request.WriteResponse(HttpResponseCode::GatewayTimeout, + HttpContentType::kText, + fmt::format("upstream request timed out: {}", Response.Error->ErrorMessage)); + case HttpClientErrorCode::kRequestCancelled: + return Request.WriteResponse(HttpResponseCode::ServiceUnavailable, + HttpContentType::kText, + fmt::format("upstream request cancelled: {}", Response.Error->ErrorMessage)); + default: + return Request.WriteResponse(HttpResponseCode::BadGateway, + HttpContentType::kText, + fmt::format("upstream request failed: {}", Response.Error->ErrorMessage)); + } + } + + HttpContentType ContentType = Response.ResponsePayload.GetContentType(); + + if (ContentType == HttpContentType::kHTML) + { + std::string_view Html(static_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize()); + std::string Injected = InjectProxyScript(Html, Port); + Request.WriteResponse(Response.StatusCode, HttpContentType::kHTML, std::string_view(Injected)); + } + else + { + Request.WriteResponse(Response.StatusCode, ContentType, std::move(Response.ResponsePayload)); + } +} + +void +HttpProxyHandler::PrunePort(uint16_t Port) +{ + m_ProxyClientsLock.WithExclusiveLock([&] { m_ProxyClients.erase(Port); }); + + std::vector<Ref<WsBridge>> Stale; + m_WsBridgesLock.WithExclusiveLock([&] { + for (auto It = m_WsBridges.begin(); It != m_WsBridges.end();) + { + if (It->second->Port == Port) + { + Stale.push_back(std::move(It->second)); + It = m_WsBridges.erase(It); + } + else + { + ++It; + } + } + }); + + for (auto& Bridge : Stale) + { + if (Bridge->UpstreamClient) + { + Bridge->UpstreamClient->Close(1001, "instance shutting down"); + } + if (Bridge->ClientConn->IsOpen()) + { + Bridge->ClientConn->Close(1001, "instance shutting down"); + } + } +} + +void +HttpProxyHandler::Shutdown() +{ + m_WsBridgesLock.WithExclusiveLock([&] { m_WsBridges.clear(); }); + m_ProxyClientsLock.WithExclusiveLock([&] { m_ProxyClients.clear(); }); +} + +////////////////////////////////////////////////////////////////////////// +// +// WebSocket proxy +// + +void +HttpProxyHandler::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) +{ + const std::string_view ProxyPrefix = "proxy/"; + if (!RelativeUri.starts_with(ProxyPrefix)) + { + Connection->Close(1008, "unsupported WebSocket endpoint"); + return; + } + + std::string_view ProxyTail = RelativeUri.substr(ProxyPrefix.size()); + + size_t SlashPos = ProxyTail.find('/'); + std::string_view PortStr = (SlashPos != std::string_view::npos) ? ProxyTail.substr(0, SlashPos) : ProxyTail; + std::string_view Path = (SlashPos != std::string_view::npos) ? ProxyTail.substr(SlashPos) : "/"; + + uint16_t Port = 0; + auto [Ptr, Ec] = std::from_chars(PortStr.data(), PortStr.data() + PortStr.size(), Port); + if (Ec != std::errc{} || Ptr != PortStr.data() + PortStr.size()) + { + Connection->Close(1008, "invalid proxy URL"); + return; + } + + if (!m_ValidatePort(Port)) + { + Connection->Close(1008, "target instance not available"); + return; + } + + std::string WsUrl = HttpToWsUrl(fmt::format("http://127.0.0.1:{}", Port), Path); + + Ref<WsBridge> Bridge(new WsBridge()); + Bridge->ClientConn = Connection; + Bridge->Port = Port; + + Bridge->UpstreamClient = std::make_unique<HttpWsClient>(WsUrl, *Bridge); + + try + { + Bridge->UpstreamClient->Connect(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("proxy WebSocket connect to {} failed: {}", WsUrl, Ex.what()); + Connection->Close(1011, "upstream connect failed"); + return; + } + + WebSocketConnection* Key = Connection.Get(); + m_WsBridgesLock.WithExclusiveLock([&] { m_WsBridges.emplace(Key, std::move(Bridge)); }); +} + +void +HttpProxyHandler::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) +{ + Ref<WsBridge> Bridge; + m_WsBridgesLock.WithSharedLock([&] { + auto It = m_WsBridges.find(&Conn); + if (It != m_WsBridges.end()) + { + Bridge = It->second; + } + }); + + if (!Bridge || !Bridge->UpstreamClient) + { + return; + } + + switch (Msg.Opcode) + { + case WebSocketOpcode::kText: + Bridge->UpstreamClient->SendText(std::string_view(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize())); + break; + case WebSocketOpcode::kBinary: + Bridge->UpstreamClient->SendBinary( + std::span<const uint8_t>(static_cast<const uint8_t*>(Msg.Payload.GetData()), Msg.Payload.GetSize())); + break; + case WebSocketOpcode::kClose: + Bridge->UpstreamClient->Close(Msg.CloseCode, {}); + break; + default: + break; + } +} + +void +HttpProxyHandler::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) +{ + Ref<WsBridge> Bridge = m_WsBridgesLock.WithExclusiveLock([this, &Conn]() -> Ref<WsBridge> { + auto It = m_WsBridges.find(&Conn); + if (It != m_WsBridges.end()) + { + Ref<WsBridge> Bridge = std::move(It->second); + m_WsBridges.erase(It); + return Bridge; + } + return {}; + }); + + if (Bridge && Bridge->UpstreamClient) + { + Bridge->UpstreamClient->Close(Code, Reason); + } +} + +#if ZEN_WITH_TESTS + +TEST_SUITE_BEGIN("server.httpproxyhandler"); + +TEST_CASE("server.httpproxyhandler.html_injection") +{ + SUBCASE("injects before </head>") + { + std::string Result = InjectProxyScript("<html><head></head><body></body></html>", 21005); + CHECK(Result.find("<script>") != std::string::npos); + CHECK(Result.find("/hub/proxy/21005") != std::string::npos); + size_t ScriptEnd = Result.find("</script>"); + size_t HeadClose = Result.find("</head>"); + REQUIRE(ScriptEnd != std::string::npos); + REQUIRE(HeadClose != std::string::npos); + CHECK(ScriptEnd < HeadClose); + } + + SUBCASE("prepends when no </head>") + { + std::string Result = InjectProxyScript("<body>content</body>", 21005); + CHECK(Result.find("<script>") == 0); + CHECK(Result.find("<body>content</body>") != std::string::npos); + } + + SUBCASE("empty html") + { + std::string Result = InjectProxyScript("", 21005); + CHECK(Result.find("<script>") != std::string::npos); + CHECK(Result.find("/hub/proxy/21005") != std::string::npos); + } + + SUBCASE("preserves original content") + { + std::string_view Html = "<html><head><title>Test</title></head><body><h1>Dashboard</h1></body></html>"; + std::string Result = InjectProxyScript(Html, 21005); + CHECK(Result.find("<title>Test</title>") != std::string::npos); + CHECK(Result.find("<h1>Dashboard</h1>") != std::string::npos); + } +} + +TEST_CASE("server.httpproxyhandler.port_embedding") +{ + std::string Result = InjectProxyScript("<head></head>", 80); + CHECK(Result.find("/hub/proxy/80") != std::string::npos); + + Result = InjectProxyScript("<head></head>", 65535); + CHECK(Result.find("/hub/proxy/65535") != std::string::npos); +} + +TEST_SUITE_END(); + +void +httpproxyhandler_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen diff --git a/src/zenserver/hub/httpproxyhandler.h b/src/zenserver/hub/httpproxyhandler.h new file mode 100644 index 000000000..8667c0ca1 --- /dev/null +++ b/src/zenserver/hub/httpproxyhandler.h @@ -0,0 +1,52 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/thread.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/websocket.h> + +#include <functional> +#include <memory> +#include <unordered_map> + +namespace zen { + +class HttpClient; + +class HttpProxyHandler +{ +public: + using PortValidator = std::function<bool(uint16_t)>; + + HttpProxyHandler(); + explicit HttpProxyHandler(PortValidator ValidatePort); + ~HttpProxyHandler(); + + void SetPortValidator(PortValidator ValidatePort); + + HttpProxyHandler(const HttpProxyHandler&) = delete; + HttpProxyHandler& operator=(const HttpProxyHandler&) = delete; + + void HandleProxyRequest(HttpServerRequest& Request, std::string_view PortStr, std::string_view PathTail); + void PrunePort(uint16_t Port); + void Shutdown(); + + void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri); + void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg); + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason); + +private: + PortValidator m_ValidatePort; + + HttpClient& GetOrCreateProxyClient(uint16_t Port); + + RwLock m_ProxyClientsLock; + std::unordered_map<uint16_t, std::unique_ptr<HttpClient>> m_ProxyClients; + + struct WsBridge; + RwLock m_WsBridgesLock; + std::unordered_map<WebSocketConnection*, Ref<WsBridge>> m_WsBridges; +}; + +} // namespace zen diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 6c44e2333..978814b46 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -19,7 +19,6 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS -# include <zencore/filesystem.h> # include <zencore/testing.h> # include <zencore/testutils.h> #endif @@ -122,32 +121,84 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - WorkerThreadPool* OptionalWorkerPool, - AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) +ProcessMetrics +Hub::AtomicProcessMetrics::Load() const +{ + return { + .MemoryBytes = MemoryBytes.load(), + .KernelTimeMs = KernelTimeMs.load(), + .UserTimeMs = UserTimeMs.load(), + .WorkingSetSize = WorkingSetSize.load(), + .PeakWorkingSetSize = PeakWorkingSetSize.load(), + .PagefileUsage = PagefileUsage.load(), + .PeakPagefileUsage = PeakPagefileUsage.load(), + }; +} + +void +Hub::AtomicProcessMetrics::Store(const ProcessMetrics& Metrics) +{ + MemoryBytes.store(Metrics.MemoryBytes); + KernelTimeMs.store(Metrics.KernelTimeMs); + UserTimeMs.store(Metrics.UserTimeMs); + WorkingSetSize.store(Metrics.WorkingSetSize); + PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); + PagefileUsage.store(Metrics.PagefileUsage); + PeakPagefileUsage.store(Metrics.PeakPagefileUsage); +} + +void +Hub::AtomicProcessMetrics::Reset() +{ + MemoryBytes.store(0); + KernelTimeMs.store(0); + UserTimeMs.store(0); + WorkingSetSize.store(0); + PeakWorkingSetSize.store(0); + PagefileUsage.store(0); + PeakPagefileUsage.store(0); +} + +void +Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const +{ + m_Lock.WithSharedLock([&]() { + OutSystemMetrict = m_SystemMetrics; + OutDiskSpace = m_DiskSpace; + }); +} + +////////////////////////////////////////////////////////////////////////// + +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_WorkerPool(OptionalWorkerPool) +, m_WorkerPool(Config.OptionalProvisionWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { - m_HostMetrics = GetSystemMetrics(); - m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; - m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; + ZEN_ASSERT_FORMAT( + Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, + "Provision and hydration worker pools must be distinct to avoid deadlocks"); - if (m_Config.HydrationTargetSpecification.empty()) + HydrationBase::Configuration HydrationConfig; + if (!m_Config.HydrationTargetSpecification.empty()) + { + HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification; + } + else if (!m_Config.HydrationOptions) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); - m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); + HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); } else { - m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + HydrationConfig.Options = m_Config.HydrationOptions; } + m_Hydration = InitHydration(HydrationConfig); m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); @@ -171,6 +222,9 @@ Hub::Hub(const Configuration& Config, } } #endif + + UpdateMachineMetrics(); + m_WatchDog = std::thread([this]() { WatchDog(); }); } @@ -195,6 +249,9 @@ Hub::Shutdown() { ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + bool Expected = false; + bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); + m_WatchDogEvent.Set(); if (m_WatchDog.joinable()) { @@ -203,8 +260,6 @@ Hub::Shutdown() m_WatchDog = {}; - bool Expected = false; - bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); if (WaitForBackgroundWork && m_WorkerPool) { m_BackgroundWorkLatch.CountDown(); @@ -254,7 +309,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; - if (!CanProvisionInstance(ModuleId, /* out */ Reason)) + if (!CanProvisionInstanceLocked(ModuleId, /* out */ Reason)) { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); @@ -271,12 +326,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .HydrationTempPath = m_HydrationTempPath, - .HydrationTargetSpecification = m_HydrationTargetSpecification, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, + *m_Hydration, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -289,6 +350,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) Instance = NewInstance->LockExclusive(/*Wait*/ true); m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset(); m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); // Set Provisioning while both hub lock and instance lock are held so that any // concurrent Deprovision sees the in-flight state, not Unprovisioned. @@ -329,11 +391,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) switch (CurrentState) { case HubInstanceState::Provisioning: + case HubInstanceState::Recovering: + case HubInstanceState::Waking: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Unprovisioned: break; case HubInstanceState::Provisioned: + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return Response{EResponseCode::Completed}; case HubInstanceState::Hibernated: _.ReleaseNow(); @@ -354,6 +419,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) Instance = {}; if (ActualState == HubInstanceState::Provisioned) { + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return Response{EResponseCode::Completed}; } if (ActualState == HubInstanceState::Provisioning) @@ -540,6 +606,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI switch (CurrentState) { case HubInstanceState::Deprovisioning: + case HubInstanceState::Obliterating: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Hibernated: @@ -585,11 +652,11 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI try { m_WorkerPool->ScheduleWork( - [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance, ActiveInstanceIndex); + CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -617,20 +684,222 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI } else { - CompleteDeprovision(Instance, ActiveInstanceIndex); + CompleteDeprovision(Instance, ActiveInstanceIndex, OldState); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +Hub::Response +Hub::Obliterate(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; + { + RwLock::ExclusiveLockScope Lock(m_Lock); + + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) + { + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Obliterating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + case HubInstanceState::Hibernated: + case HubInstanceState::Crashed: + break; + case HubInstanceState::Deprovisioning: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is being deprovisioned, retry after completion", ModuleId)}; + case HubInstanceState::Recovering: + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); + + Instance = RawInstance->LockExclusive(/*Wait*/ true); + } + else + { + // Module not tracked by hub - obliterate backend data directly. + // Covers the deprovisioned case where data was preserved via dehydration. + if (m_ObliteratingInstances.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + m_ObliteratingInstances.insert(ModuleId); + Lock.ReleaseNow(); + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId)]() { + auto Guard = MakeGuard([this, ModuleId]() { + m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); }); + m_BackgroundWorkLatch.CountDown(); + }); + try + { + ObliterateBackendData(ModuleId); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of untracked module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed to dispatch async obliterate of untracked module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + } + throw; + } + + return Response{EResponseCode::Accepted}; + } + + auto _ = MakeGuard([this, &ModuleId]() { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + }); + + ObliterateBackendData(ModuleId); + + return Response{EResponseCode::Completed}; + } + } + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Obliterating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {}); + + if (m_WorkerPool) + { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteObliterate(*Instance, ActiveInstanceIndex); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {}); + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); + } + + throw; + } + } + else + { + CompleteObliterate(Instance, ActiveInstanceIndex); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) +Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { + Instance.Obliterate(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {}); + throw; + } + + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); +} + +void +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + + try + { + if (OldState == HubInstanceState::Provisioned) + { + ZEN_INFO("Triggering GC for module {}", ModuleId); + + HttpClient GcClient(fmt::format("http://localhost:{}", Port)); + + HttpClient::KeyValueMap Params; + Params.Entries.insert({"smallobjects", "true"}); + Params.Entries.insert({"skipcid", "false"}); + HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params); + Stopwatch Timer; + while (Response && Timer.GetElapsedTimeMs() < 5000) + { + Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject)); + if (Response) + { + bool Complete = Response.AsObject()["Status"].AsString() != "Running"; + if (Complete) + { + break; + } + Sleep(50); + } + } + } Instance.Deprovision(); } catch (const std::exception& Ex) @@ -649,20 +918,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si } NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); - Instance = {}; - - std::unique_ptr<StorageServerInstance> DeleteInstance; - { - RwLock::ExclusiveLockScope HubLock(m_Lock); - auto It = m_InstanceLookup.find(std::string(ModuleId)); - ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); - ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); - DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); - } - DeleteInstance.reset(); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); } Hub::Response @@ -935,6 +1191,46 @@ Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t Ac } } +void +Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId) +{ + Instance = {}; + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DeleteInstance.reset(); +} + +void +Hub::ObliterateBackendData(std::string_view ModuleId) +{ + std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId; + std::filesystem::path TempDir = m_HydrationTempPath / ModuleId; + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)}; + if (m_Config.OptionalHydrationWorkerPool) + { + Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool, + .AbortFlag = &AbortFlag, + .PauseFlag = &PauseFlag}); + } + + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); + Hydrator->Obliterate(); +} + bool Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { @@ -947,12 +1243,10 @@ Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); - InstanceInfo Info{ - m_ActiveInstances[ActiveInstanceIndex].State.load(), - std::chrono::system_clock::now() // TODO - }; - Instance->GetProcessMetrics(Info.Metrics); - Info.Port = Instance->GetBasePort(); + InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(), + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()}; + Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load(); + Info.Port = Instance->GetBasePort(); *OutInstanceInfo = Info; } @@ -971,12 +1265,10 @@ Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const Instan { const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); - InstanceInfo Info{ - m_ActiveInstances[ActiveInstanceIndex].State.load(), - std::chrono::system_clock::now() // TODO - }; - Instance->GetProcessMetrics(Info.Metrics); - Info.Port = Instance->GetBasePort(); + InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(), + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()}; + Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load(); + Info.Port = Instance->GetBasePort(); Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info)); } @@ -994,30 +1286,15 @@ Hub::GetInstanceCount() return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); }); } -void -Hub::UpdateCapacityMetrics() -{ - m_HostMetrics = GetSystemMetrics(); - - // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks... - // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules -} - -void -Hub::UpdateStats() -{ - int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast<int>(m_InstanceLookup.size()); }); - int CurrentMaxCount = m_MaxInstanceCount.load(); - - int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); - - m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); -} - bool -Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) +Hub::CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason) { - ZEN_UNUSED(ModuleId); + if (m_ObliteratingInstances.contains(std::string(ModuleId))) + { + OutReason = fmt::format("module '{}' is being obliterated", ModuleId); + return false; + } + if (m_FreeActiveInstanceIndexes.empty()) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); @@ -1025,7 +1302,24 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return false; } - // TODO: handle additional resource metrics + const uint64_t DiskUsedBytes = m_DiskSpace.Free <= m_DiskSpace.Total ? m_DiskSpace.Total - m_DiskSpace.Free : 0; + if (m_Config.ResourceLimits.DiskUsageBytes > 0 && DiskUsedBytes > m_Config.ResourceLimits.DiskUsageBytes) + { + OutReason = + fmt::format("disk usage ({}) exceeds ({})", NiceBytes(DiskUsedBytes), NiceBytes(m_Config.ResourceLimits.DiskUsageBytes)); + return false; + } + + const uint64_t RamUsedMiB = m_SystemMetrics.AvailSystemMemoryMiB <= m_SystemMetrics.SystemMemoryMiB + ? m_SystemMetrics.SystemMemoryMiB - m_SystemMetrics.AvailSystemMemoryMiB + : 0; + const uint64_t RamUsedBytes = RamUsedMiB * 1024 * 1024; + if (m_Config.ResourceLimits.MemoryUsageBytes > 0 && RamUsedBytes > m_Config.ResourceLimits.MemoryUsageBytes) + { + OutReason = + fmt::format("ram usage ({}) exceeds ({})", NiceBytes(RamUsedBytes), NiceBytes(m_Config.ResourceLimits.MemoryUsageBytes)); + return false; + } return true; } @@ -1036,6 +1330,21 @@ Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex); } +bool +Hub::IsInstancePort(uint16_t Port) const +{ + if (Port < m_Config.BasePortNumber) + { + return false; + } + size_t Index = Port - m_Config.BasePortNumber; + if (Index >= m_ActiveInstances.size()) + { + return false; + } + return m_ActiveInstances[Index].State.load(std::memory_order_relaxed) != HubInstanceState::Unprovisioned; +} + HubInstanceState Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) { @@ -1046,11 +1355,13 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS case HubInstanceState::Unprovisioned: return To == HubInstanceState::Provisioning; case HubInstanceState::Provisioned: - return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed; + return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed || + To == HubInstanceState::Obliterating; case HubInstanceState::Hibernated: - return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning; + return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Obliterating; case HubInstanceState::Crashed: - return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering; + return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || + To == HubInstanceState::Recovering || To == HubInstanceState::Obliterating; case HubInstanceState::Provisioning: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; case HubInstanceState::Hibernating: @@ -1062,11 +1373,15 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS To == HubInstanceState::Crashed; case HubInstanceState::Recovering: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; + case HubInstanceState::Obliterating: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; } return false; }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0); - m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(Now); + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.store(Now); return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); } @@ -1075,10 +1390,14 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; - { RwLock::ExclusiveLockScope _(m_Lock); + if (m_ShutdownFlag.load()) + { + return; + } + auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { @@ -1173,14 +1492,14 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex) { + const std::string ModuleId(LockedInstance.GetModuleId()); + HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); if (LockedInstance.IsRunning()) { - LockedInstance.UpdateMetrics(); + m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics()); if (InstanceState == HubInstanceState::Provisioned) { - const std::string ModuleId(LockedInstance.GetModuleId()); - const uint16_t Port = LockedInstance.GetBasePort(); const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); @@ -1260,8 +1579,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, else if (InstanceState == HubInstanceState::Provisioned) { // Process is not running but state says it should be - instance died unexpectedly. - const std::string ModuleId(LockedInstance.GetModuleId()); - const uint16_t Port = LockedInstance.GetBasePort(); + const uint16_t Port = LockedInstance.GetBasePort(); UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed); NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); LockedInstance = {}; @@ -1272,7 +1590,6 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, { // Process is not running - no HTTP activity check is possible. // Use a pure time-based check; the margin window does not apply here. - const std::string ModuleId = std::string(LockedInstance.GetModuleId()); const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); @@ -1304,7 +1621,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, } else { - // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Obliterating) - expected, skip. // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance // lock was busy on a previous cycle and recovery is already pending. return true; @@ -1312,6 +1629,43 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, } void +Hub::UpdateMachineMetrics() +{ + try + { + bool DiskSpaceOk = false; + DiskSpace Disk; + + std::filesystem::path ChildDir = m_RunEnvironment.GetChildBaseDir(); + if (!ChildDir.empty()) + { + if (DiskSpaceInfo(ChildDir, Disk)) + { + DiskSpaceOk = true; + } + else + { + ZEN_WARN("Failed to query disk space for '{}'; disk-based provisioning limits will not be enforced", ChildDir); + } + } + + SystemMetrics Metrics = GetSystemMetrics(); + + m_Lock.WithExclusiveLock([&]() { + if (DiskSpaceOk) + { + m_DiskSpace = Disk; + } + m_SystemMetrics = Metrics; + }); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to update machine metrics. Reason: {}", Ex.what()); + } +} + +void Hub::WatchDog() { const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count(); @@ -1326,16 +1680,18 @@ Hub::WatchDog() [&]() -> bool { return m_WatchDogEvent.Wait(0); }); size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 - while (!m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) + while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) { try { + UpdateMachineMetrics(); + // Snapshot slot count. We iterate all slots (including freed nulls) so // round-robin coverage is not skewed by deprovisioned entries. size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); }); Stopwatch Timer; - bool ShuttingDown = false; + bool ShuttingDown = m_ShutdownFlag.load(); while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown) { StorageServerInstance::SharedLockedPtr LockedInstance; @@ -1366,16 +1722,24 @@ Hub::WatchDog() std::string ModuleId(LockedInstance.GetModuleId()); - bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); - if (InstanceIsOk) + try { - ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); + bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); + if (InstanceIsOk) + { + ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); + } + else + { + ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); + AttemptRecoverInstance(ModuleId); + } } - else + catch (const std::exception& Ex) { - ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); - AttemptRecoverInstance(ModuleId); + ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what()); } + ShuttingDown |= m_ShutdownFlag.load(); } } catch (const std::exception& Ex) @@ -1417,6 +1781,14 @@ static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::mill namespace hub_testutils { + struct TestHubPools + { + WorkerThreadPool ProvisionPool; + WorkerThreadPool HydrationPool; + + explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {} + }; + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); @@ -1425,9 +1797,14 @@ namespace hub_testutils { std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, - WorkerThreadPool* WorkerPool = nullptr) + TestHubPools* Pools = nullptr) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); + if (Pools) + { + Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool; + Config.OptionalHydrationWorkerPool = &Pools->HydrationPool; + } + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } struct CallbackRecord @@ -1499,14 +1876,32 @@ namespace hub_testutils { } // namespace hub_testutils -TEST_CASE("hub.provision_basic") +TEST_CASE("hub.provision") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + hub_testutils::StateChangeCapture CaptureInstance; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + // Provision HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); @@ -1515,12 +1910,23 @@ TEST_CASE("hub.provision_basic") Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); + CHECK_NE(InstanceInfo.StateChangeTime, std::chrono::system_clock::time_point::min()); + CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now()); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } + // Verify provision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + } + + // Deprovision const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); @@ -1530,6 +1936,28 @@ TEST_CASE("hub.provision_basic") HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } + + // Verify deprovision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + } + + // Verify full transition sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); + } } TEST_CASE("hub.provision_config") @@ -1582,92 +2010,6 @@ TEST_CASE("hub.provision_config") } } -TEST_CASE("hub.provision_callbacks") -{ - ScopedTemporaryDirectory TempDir; - - hub_testutils::StateChangeCapture CaptureInstance; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); - - HubProvisionedInstanceInfo Info; - - const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); - REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); - CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); - } - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(ModClient.Get("/health/")); - } - - const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); - CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(!ModClient.Get("/health/")); - } - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - } -} - -TEST_CASE("hub.provision_callback_sequence") -{ - ScopedTemporaryDirectory TempDir; - - struct TransitionRecord - { - HubInstanceState OldState; - HubInstanceState NewState; - }; - RwLock CaptureMutex; - std::vector<TransitionRecord> Transitions; - - auto CaptureFunc = - [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { - ZEN_UNUSED(ModuleId); - ZEN_UNUSED(Info); - CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); - }; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("seq_module", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Deprovision("seq_module"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - RwLock::SharedLockScope _(CaptureMutex); - REQUIRE_EQ(Transitions.size(), 4u); - CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); - CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); -} - TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; @@ -1699,54 +2041,7 @@ TEST_CASE("hub.instance_limit") CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.enumerate_modules") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - HubProvisionedInstanceInfo Info; - - { - const Hub::Response R = HubInstance->Provision("enum_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Provision("enum_b", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - std::vector<std::string> Ids; - int ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(Ids.size(), 2u); - CHECK_EQ(ProvisionedCount, 2); - const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); - const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); - CHECK(FoundA); - CHECK(FoundB); - - HubInstance->Deprovision("enum_a"); - Ids.clear(); - ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - REQUIRE_EQ(Ids.size(), 1u); - CHECK_EQ(Ids[0], "enum_b"); - CHECK_EQ(ProvisionedCount, 1); -} - -TEST_CASE("hub.max_instance_count") +TEST_CASE("hub.enumerate_and_instance_tracking") { ScopedTemporaryDirectory TempDir; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); @@ -1756,22 +2051,56 @@ TEST_CASE("hub.max_instance_count") HubProvisionedInstanceInfo Info; { - const Hub::Response R = HubInstance->Provision("max_a", Info); + const Hub::Response R = HubInstance->Provision("track_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); { - const Hub::Response R = HubInstance->Provision("max_b", Info); + const Hub::Response R = HubInstance->Provision("track_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + // Enumerate both modules + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); + CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end()); + CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end()); + } + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - HubInstance->Deprovision("max_a"); + // Deprovision one - max instance count must not decrease + HubInstance->Deprovision("track_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); + + // Enumerate after deprovision + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "track_b"); + CHECK_EQ(ProvisionedCount, 1); + } } TEST_CASE("hub.concurrent_callbacks") @@ -1917,7 +2246,7 @@ TEST_CASE("hub.job_object") } # endif // ZEN_PLATFORM_WINDOWS -TEST_CASE("hub.hibernate_wake") +TEST_CASE("hub.hibernate_wake_obliterate") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; @@ -1927,6 +2256,11 @@ TEST_CASE("hub.hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; + // Error cases on non-existent modules (no provision needed) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Provision { const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); @@ -1934,82 +2268,104 @@ TEST_CASE("hub.hibernate_wake") } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); + const std::chrono::system_clock::time_point ProvisionedTime = Info.StateChangeTime; + CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min()); + CHECK_LE(ProvisionedTime, std::chrono::system_clock::now()); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } + // Double-wake on provisioned module is idempotent + CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Hibernate - const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); - REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); + { + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); + const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime; + CHECK_GE(HibernatedTime, ProvisionedTime); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } + // Double-hibernate on already-hibernated module is idempotent + CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Wake - const Hub::Response WakeResult = HubInstance->Wake("hib_a"); - REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); + { + const Hub::Response R = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); + CHECK_GE(Info.StateChangeTime, HibernatedTime); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } - // Deprovision - const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); - CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); - CHECK_FALSE(HubInstance->Find("hib_a")); + // Hibernate again for obliterate-from-hibernated test { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } -} - -TEST_CASE("hub.hibernate_wake_errors") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22700; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo ProvInfo; + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); - // Hibernate/wake on a non-existent module - returns NotFound (-> 404) - CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Obliterate from hibernated + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); - // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) + // Re-provision for obliterate-from-provisioned test { - const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); { - const Hub::Response R = HubInstance->Hibernate("err_b"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); } + // Obliterate from provisioned + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); { - const Hub::Response HibResp = HubInstance->Hibernate("err_b"); - CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); } - // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) + // Obliterate deprovisioned module (not tracked by hub, backend data may exist) { - const Hub::Response R = HubInstance->Wake("err_b"); + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } - { - const Hub::Response WakeResp = HubInstance->Wake("err_b"); - CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); + const Hub::Response R = HubInstance->Deprovision("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + CHECK_FALSE(HubInstance->Find("hib_a")); + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); } - // Deprovision not-found - returns NotFound (-> 404) - CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Obliterate of a never-provisioned module also succeeds (no-op backend cleanup) + CHECK(HubInstance->Obliterate("never_existed").ResponseCode == Hub::EResponseCode::Completed); } TEST_CASE("hub.async_hibernate_wake") @@ -2019,8 +2375,8 @@ TEST_CASE("hub.async_hibernate_wake") Hub::Configuration Config; Config.BasePortNumber = 23000; - WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; @@ -2150,25 +2506,21 @@ TEST_CASE("hub.recover_process_crash") if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && ModClient.Get("/health/")) { - // Recovery must reuse the same port - the instance was never removed from the hub's - // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. CHECK_EQ(InstanceInfo.Port, Info.Port); Recovered = true; break; } } - CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); + REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // Verify the full crash/recovery callback sequence { RwLock::SharedLockScope _(CaptureMutex); REQUIRE_GE(Transitions.size(), 3u); - // Find the Provisioned->Crashed transition const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; }); REQUIRE_NE(CrashedIt, Transitions.end()); - // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned const auto RecoveringIt = CrashedIt + 1; REQUIRE_NE(RecoveringIt, Transitions.end()); CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); @@ -2178,44 +2530,6 @@ TEST_CASE("hub.recover_process_crash") CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); } -} - -TEST_CASE("hub.recover_process_crash_then_deprovision") -{ - ScopedTemporaryDirectory TempDir; - - // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. - Hub::Configuration Config; - Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); - Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("module_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - // Kill the child process, wait for the watchdog to detect and recover the instance. - HubInstance->TerminateModuleForTesting("module_a"); - - constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); - constexpr auto kTimeoutMs = std::chrono::seconds(15); - const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; - - bool Recovered = false; - while (std::chrono::steady_clock::now() < Deadline) - { - std::this_thread::sleep_for(kPollIntervalMs); - Hub::InstanceInfo InstanceInfo; - if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) - { - Recovered = true; - break; - } - } - REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. { @@ -2244,8 +2558,8 @@ TEST_CASE("hub.async_provision_concurrent") Config.BasePortNumber = 22800; Config.InstanceLimit = kModuleCount; - WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(4); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); std::vector<std::string> Reasons(kModuleCount); @@ -2326,8 +2640,8 @@ TEST_CASE("hub.async_provision_shutdown_waits") Config.InstanceLimit = kModuleCount; Config.BasePortNumber = 22900; - WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); @@ -2352,15 +2666,15 @@ TEST_CASE("hub.async_provision_shutdown_waits") TEST_CASE("hub.async_provision_rejected") { - // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present. + // Rejection from CanProvisionInstanceLocked fires synchronously even when a WorkerPool is present. ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.InstanceLimit = 1; Config.BasePortNumber = 23100; - WorkerThreadPool WorkerPool(2, "hub_async_rejected"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo Info; @@ -2369,7 +2683,7 @@ TEST_CASE("hub.async_provision_rejected") REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); REQUIRE_NE(Info.Port, 0); - // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected + // Second provision: CanProvisionInstanceLocked rejects synchronously (limit reached), returns Rejected HubProvisionedInstanceInfo Info2; const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); @@ -2448,12 +2762,12 @@ TEST_CASE("hub.instance.inactivity.deprovision") // Phase 1: immediately after setup all three instances must still be alive. // No timeout has elapsed yet (only 100ms have passed). - CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), - "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout. // idle must remain alive - its 2s provisioned timeout has not elapsed yet. @@ -2477,7 +2791,7 @@ TEST_CASE("hub.instance.inactivity.deprovision") CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2"); - CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed"); + CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); @@ -2485,6 +2799,55 @@ TEST_CASE("hub.instance.inactivity.deprovision") HubInstance->Shutdown(); } +TEST_CASE("hub.machine_metrics") +{ + ScopedTemporaryDirectory TempDir; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}); + + // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics + // are available immediately without waiting for a watchdog cycle. + SystemMetrics SysMetrics; + DiskSpace Disk; + HubInstance->GetMachineMetrics(SysMetrics, Disk); + + CHECK_GT(Disk.Total, 0u); + CHECK_LE(Disk.Free, Disk.Total); + + CHECK_GT(SysMetrics.SystemMemoryMiB, 0u); + CHECK_LE(SysMetrics.AvailSystemMemoryMiB, SysMetrics.SystemMemoryMiB); + + CHECK_GT(SysMetrics.VirtualMemoryMiB, 0u); + CHECK_LE(SysMetrics.AvailVirtualMemoryMiB, SysMetrics.VirtualMemoryMiB); +} + +TEST_CASE("hub.provision_rejected_resource_limits") +{ + // The Hub constructor calls UpdateMachineMetrics() synchronously, so CanProvisionInstanceLocked + // can enforce limits immediately without waiting for a watchdog cycle. + ScopedTemporaryDirectory TempDir; + + { + Hub::Configuration Config; + Config.ResourceLimits.DiskUsageBytes = 1; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision("disk_limit", Info); + CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_NE(Result.Message.find("disk usage"), std::string::npos); + } + + { + Hub::Configuration Config; + Config.ResourceLimits.MemoryUsageBytes = 1; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision("mem_limit", Info); + CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_NE(Result.Message.find("ram usage"), std::string::npos); + } +} + TEST_SUITE_END(); void diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index c343b19e2..40d046ce0 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -6,6 +6,8 @@ #include "resourcemetrics.h" #include "storageserverinstance.h" +#include <zencore/compactbinary.h> +#include <zencore/filesystem.h> #include <zencore/system.h> #include <zenutil/zenserverprocess.h> @@ -16,6 +18,7 @@ #include <memory> #include <thread> #include <unordered_map> +#include <unordered_set> namespace zen { @@ -64,10 +67,20 @@ public: uint32_t InstanceHttpThreadCount = 0; // Automatic int InstanceCoreLimit = 0; // Automatic + std::string InstanceMalloc; + std::string InstanceTrace; + std::string InstanceTraceHost; + std::string InstanceTraceFile; std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; + CbObject HydrationOptions; WatchDogConfiguration WatchDog; + + ResourceMetrics ResourceLimits; + + WorkerThreadPool* OptionalProvisionWorkerPool = nullptr; + WorkerThreadPool* OptionalHydrationWorkerPool = nullptr; }; typedef std::function< @@ -76,7 +89,6 @@ public: Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, - WorkerThreadPool* OptionalWorkerPool = nullptr, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); @@ -86,7 +98,7 @@ public: struct InstanceInfo { HubInstanceState State = HubInstanceState::Unprovisioned; - std::chrono::system_clock::time_point ProvisionTime; + std::chrono::system_clock::time_point StateChangeTime; ProcessMetrics Metrics; uint16_t Port = 0; }; @@ -126,6 +138,14 @@ public: Response Deprovision(const std::string& ModuleId); /** + * Obliterate a storage server instance and all associated data. + * Shuts down the process, deletes backend hydration data, and cleans local state. + * + * @param ModuleId The ID of the module to obliterate. + */ + Response Obliterate(const std::string& ModuleId); + + /** * Hibernate a storage server instance for the given module ID. * The instance is shut down but its data is preserved; it can be woken later. * @@ -160,6 +180,10 @@ public: int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); } + void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const; + + bool IsInstancePort(uint16_t Port) const; + const Configuration& GetConfig() const { return m_Config; } #if ZEN_WITH_TESTS @@ -175,15 +199,31 @@ private: AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; - std::string m_HydrationTargetSpecification; - std::filesystem::path m_HydrationTempPath; + std::unique_ptr<HydrationBase> m_Hydration; + std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif - RwLock m_Lock; + mutable RwLock m_Lock; std::unordered_map<std::string, size_t> m_InstanceLookup; + // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes. + struct AtomicProcessMetrics + { + std::atomic<uint64_t> MemoryBytes = 0; + std::atomic<uint64_t> KernelTimeMs = 0; + std::atomic<uint64_t> UserTimeMs = 0; + std::atomic<uint64_t> WorkingSetSize = 0; + std::atomic<uint64_t> PeakWorkingSetSize = 0; + std::atomic<uint64_t> PagefileUsage = 0; + std::atomic<uint64_t> PeakPagefileUsage = 0; + + ProcessMetrics Load() const; + void Store(const ProcessMetrics& Metrics); + void Reset(); + }; + struct ActiveInstance { // Invariant: Instance == nullptr if and only if State == Unprovisioned. @@ -192,11 +232,16 @@ private: // without holding the hub lock. std::unique_ptr<StorageServerInstance> Instance; std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; - // TODO: We should move current metrics here (from StorageServerInstance) - // Read and updated by WatchDog, updates to State triggers a reset of both + // Process metrics - written by WatchDog (inside instance shared lock), read lock-free. + AtomicProcessMetrics ProcessMetrics; + + // Activity tracking - written by WatchDog, reset on every state transition. std::atomic<uint64_t> LastKnownActivitySum = 0; std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min(); + + // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules. + std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min(); }; // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive @@ -224,23 +269,23 @@ private: } HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState); - std::vector<ActiveInstance> m_ActiveInstances; - std::deque<size_t> m_FreeActiveInstanceIndexes; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - std::atomic<int> m_MaxInstanceCount = 0; - std::thread m_WatchDog; + std::vector<ActiveInstance> m_ActiveInstances; + std::deque<size_t> m_FreeActiveInstanceIndexes; + SystemMetrics m_SystemMetrics; + DiskSpace m_DiskSpace; + std::atomic<int> m_MaxInstanceCount = 0; + std::thread m_WatchDog; + std::unordered_set<std::string> m_ObliteratingInstances; Event m_WatchDogEvent; void WatchDog(); + void UpdateMachineMetrics(); bool CheckInstanceStatus(HttpClient& ActivityHttpClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex); void AttemptRecoverInstance(std::string_view ModuleId); - void UpdateStats(); - void UpdateCapacityMetrics(); - bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason); uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate); @@ -248,9 +293,12 @@ private: size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance); - void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); - void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); - void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId); + void ObliterateBackendData(std::string_view ModuleId); // Notifications may fire slightly out of sync with the Hub's internal State flag. // The guarantee is that notifications are sent in the correct order, but the State diff --git a/src/zenserver/hub/hubinstancestate.cpp b/src/zenserver/hub/hubinstancestate.cpp index c47fdd294..310305e5d 100644 --- a/src/zenserver/hub/hubinstancestate.cpp +++ b/src/zenserver/hub/hubinstancestate.cpp @@ -29,6 +29,8 @@ ToString(HubInstanceState State) return "crashed"; case HubInstanceState::Recovering: return "recovering"; + case HubInstanceState::Obliterating: + return "obliterating"; } ZEN_ASSERT(false); return "unknown"; diff --git a/src/zenserver/hub/hubinstancestate.h b/src/zenserver/hub/hubinstancestate.h index c895f75d1..c7188aa5c 100644 --- a/src/zenserver/hub/hubinstancestate.h +++ b/src/zenserver/hub/hubinstancestate.h @@ -20,7 +20,8 @@ enum class HubInstanceState : uint32_t Hibernating, // Provisioned -> Hibernated (Shutting down process, preserving data on disk) Waking, // Hibernated -> Provisioned (Starting process from preserved data) Deprovisioning, // Provisioned/Hibernated/Crashed -> Unprovisioned (Shutting down process and cleaning up data) - Recovering, // Crashed -> Provisioned/Deprovisioned (Attempting in-place restart after a crash) + Recovering, // Crashed -> Provisioned/Unprovisioned (Attempting in-place restart after a crash) + Obliterating, // Provisioned/Hibernated/Crashed -> Unprovisioned (Destroying all local and backend data) }; std::string_view ToString(HubInstanceState State); diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 541127590..2f224c357 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -5,20 +5,25 @@ #include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compress.h> #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> +#include <zencore/stream.h> #include <zencore/system.h> +#include <zencore/timer.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> +#include <zenutil/filesystemutils.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <json11.hpp> -ZEN_THIRD_PARTY_INCLUDES_END +#include <numeric> +#include <unordered_map> +#include <unordered_set> #if ZEN_WITH_TESTS -# include <zencore/parallelwork.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zencore/thread.h> @@ -29,7 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -namespace { +namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime @@ -55,537 +60,1161 @@ namespace { } }; -} // namespace - -/////////////////////////////////////////////////////////////////////////// - -constexpr std::string_view FileHydratorPrefix = "file://"; - -struct FileHydrator : public HydrationStrategyBase -{ - virtual void Configure(const HydrationConfig& Config) override; - virtual void Hydrate() override; - virtual void Dehydrate() override; - -private: - HydrationConfig m_Config; - std::filesystem::path m_StorageModuleRootDir; -}; - -void -FileHydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; - - std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()))); - MakeSafeAbsolutePathInPlace(ConfigPath); + std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) + { + auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); + std::filesystem::path RelativePath; + for (auto I = ItAbs; I != Abs.end(); I++) + { + RelativePath = RelativePath / *I; + } + return RelativePath; + } - if (!std::filesystem::exists(ConfigPath)) + void CleanDirectory(WorkerThreadPool& WorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Path) { - throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); + CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); } - m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; + /////////////////////////////////////////////////////////////////////// + // Per-module storage interface driven by IncrementalHydrator. - CreateDirectories(m_StorageModuleRootDir); -} + class StorageBase + { + public: + virtual ~StorageBase() = default; + + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + virtual CbObject GetSettings() = 0; + virtual void ParseSettings(const CbObjectView& Settings) = 0; + virtual std::vector<IoHash> List() = 0; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) = 0; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) = 0; + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) = 0; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; + }; -void -FileHydrator::Hydrate() -{ - ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + class FileStorage : public StorageBase + { + public: + static constexpr std::string_view Prefix = "file://"; + static constexpr std::string_view Type = "file"; + + explicit FileStorage(std::filesystem::path ModulePath); + + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override { return {}; } + virtual void ParseSettings(const CbObjectView&) override {} + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) override; + virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {} + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; + + private: + std::filesystem::path m_StoragePath; + std::filesystem::path m_StatePathName; + std::filesystem::path m_CASPath; + }; - // Ensure target is clean - ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + class S3Storage : public StorageBase + { + public: + static constexpr std::string_view Prefix = "s3://"; + static constexpr std::string_view Type = "s3"; + static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; + + S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); + + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override; + virtual void ParseSettings(const CbObjectView& Settings) override; + virtual std::vector<IoHash> List() override; + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) override; + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) override; + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; + + private: + S3Client& m_Client; + std::string m_KeyPrefix; + std::filesystem::path m_TempDir; + uint64_t m_MultipartChunkSize; + }; - bool WipeServerState = false; + /////////////////////////////////////////////////////////////////////// + // FileStorage implementations - try + FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath)) { - ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); - CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); + MakeSafeAbsolutePathInPlace(m_StoragePath); + m_StatePathName = m_StoragePath / "current-state.cbo"; + m_CASPath = m_StoragePath / "cas"; + CreateDirectories(m_CASPath); } - catch (std::exception& Ex) + + void FileStorage::SaveMetadata(const CbObject& Data) { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); + BinaryWriter Output; + SaveCompactBinary(Output, Data); + WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); + } - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; + CbObject FileStorage::LoadMetadata() + { + if (!IsFile(m_StatePathName)) + { + return {}; + } + FileContents Content = ReadFile(m_StatePathName); + if (Content.ErrorCode) + { + ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); + } + IoBuffer Payload = Content.Flatten(); + CbValidateError Error; + CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); + if (Error != CbValidateError::None) + { + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + } + return Result; } - if (WipeServerState) + std::vector<IoHash> FileStorage::List() { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + DirectoryContent DirContent; + GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); + std::vector<IoHash> Result; + Result.reserve(DirContent.Files.size()); + for (const std::filesystem::path& Path : DirContent.Files) + { + IoHash Hash; + if (IoHash::TryParse(Path.filename().string(), Hash)) + { + Result.push_back(Hash); + } + } + return Result; } -} -void -FileHydrator::Dehydrate() -{ - ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + void FileStorage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) + { + std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash); + if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) + { + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath)); + } + } + }); + } - const std::filesystem::path TargetDir = m_StorageModuleRootDir; + void FileStorage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + ZEN_UNUSED(Size); + Work.ScheduleWork( + WorkerPool, + [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag.load()) + { + std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash); + if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) + { + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath)); + } + } + }); + } - // Ensure target is clean. This could be replaced with an atomic copy at a later date - // (i.e copy into a temporary directory name and rename it once complete) + void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) + { + ZEN_UNUSED(Work); + ZEN_UNUSED(WorkerPool); + DeleteDirectories(m_StoragePath); + } - ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); - const bool ForceRemoveReadOnlyFiles = true; - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + /////////////////////////////////////////////////////////////////////// + // S3Storage implementations - bool CopySuccess = true; + S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize) + : m_Client(Client) + , m_KeyPrefix(std::move(KeyPrefix)) + , m_TempDir(std::move(TempDir)) + , m_MultipartChunkSize(MultipartChunkSize) + { + } - try + void S3Storage::SaveMetadata(const CbObject& Data) { - ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); - CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true}); + BinaryWriter Output; + SaveCompactBinary(Output, Data); + IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); + + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3Result Result = m_Client.PutObject(Key, std::move(Payload)); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + } } - catch (std::exception& Ex) + + CbObject S3Storage::LoadMetadata() { - ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); + std::string Key = m_KeyPrefix + "/incremental-state.cbo"; + S3GetObjectResult Result = m_Client.GetObject(Key); + if (!Result.IsSuccess()) + { + if (Result.Error == S3GetObjectResult::NotFoundErrorText) + { + return {}; + } + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); + } - // We don't do the clean right here to avoid potentially running into double-throws - CopySuccess = false; + CbValidateError Error; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); + if (Error != CbValidateError::None) + { + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); + } + return Meta; } - if (!CopySuccess) + CbObject S3Storage::GetSettings() { - ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); - CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + CbObjectWriter Writer; + Writer << "MultipartChunkSize" << m_MultipartChunkSize; + return Writer.Save(); } - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); -} + void S3Storage::ParseSettings(const CbObjectView& Settings) + { + m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + } -/////////////////////////////////////////////////////////////////////////// + std::vector<IoHash> S3Storage::List() + { + std::string CasPrefix = m_KeyPrefix + "/cas/"; + S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); + } -constexpr std::string_view S3HydratorPrefix = "s3://"; + std::vector<IoHash> Hashes; + Hashes.reserve(Result.Objects.size()); + for (const S3ObjectInfo& Obj : Result.Objects) + { + size_t LastSlash = Obj.Key.rfind('/'); + if (LastSlash == std::string::npos) + { + continue; + } + IoHash Hash; + if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) + { + Hashes.push_back(Hash); + } + } + return Hashes; + } -struct S3Hydrator : public HydrationStrategyBase -{ - void Configure(const HydrationConfig& Config) override; - void Dehydrate() override; - void Hydrate() override; + void S3Storage::Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath) + { + Work.ScheduleWork(WorkerPool, + [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Client& Client = m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObjectMultipart( + Key, + Size, + [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, + m_MultipartChunkSize); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + else + { + BasicFile File(SourcePath, BasicFile::Mode::kRead); + S3Result Result = Client.PutObject(Key, File.ReadAll()); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + } + } + }); + } -private: - S3Client CreateS3Client() const; - std::string BuildTimestampFolderName() const; - std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; + void S3Storage::Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath) + { + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - HydrationConfig m_Config; - std::string m_Bucket; - std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash - std::string m_Region; - SigV4Credentials m_Credentials; - Ref<ImdsCredentialProvider> m_CredentialProvider; -}; + if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) + { + class WorkData + { + public: + WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) + { + PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); + } + ~WorkData() { m_DestFile.Flush(); } + void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } -void -S3Hydrator::Configure(const HydrationConfig& Config) -{ - m_Config = Config; + private: + BasicFile m_DestFile; + }; - std::string_view Spec = m_Config.TargetSpecification; - Spec.remove_prefix(S3HydratorPrefix.size()); + std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size); - size_t SlashPos = Spec.find('/'); - std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; - m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); - m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; + uint64_t Offset = 0; + while (Offset < Size) + { + uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); - ZEN_ASSERT(!m_Bucket.empty()); + Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); + } - std::string Region = GetEnvVariable("AWS_DEFAULT_REGION"); - if (Region.empty()) - { - Region = GetEnvVariable("AWS_REGION"); - } - if (Region.empty()) - { - Region = "us-east-1"; + Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); + }); + Offset += ChunkSize; + } + } + else + { + Work.ScheduleWork(WorkerPool, + [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + } + + if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) + { + std::error_code Ec; + std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); + if (Ec) + { + WriteFile(DestinationPath, Chunk.Content); + } + else + { + Chunk.Content.SetDeleteOnClose(false); + Chunk.Content = {}; + RenameFile(ChunkPath, DestinationPath, Ec); + if (Ec) + { + Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); + Chunk.Content.SetDeleteOnClose(true); + WriteFile(DestinationPath, Chunk.Content); + } + } + } + else + { + WriteFile(DestinationPath, Chunk.Content); + } + }); + } } - m_Region = std::move(Region); - std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); - if (AccessKeyId.empty()) - { - m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); - } - else + void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) { - m_Credentials.AccessKeyId = std::move(AccessKeyId); - m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); - m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + S3Result Result = m_Client.Touch(Key); + if (!Result.IsSuccess()) + { + throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); + } + }); } -} -S3Client -S3Hydrator::CreateS3Client() const -{ - S3ClientOptions Options; - Options.BucketName = m_Bucket; - Options.Region = m_Region; - - if (!m_Config.S3Endpoint.empty()) + void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { - Options.Endpoint = m_Config.S3Endpoint; - Options.PathStyle = m_Config.S3PathStyle; + std::string ModulePrefix = m_KeyPrefix + "/"; + S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); + if (!ListResult.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); + } + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Result DelResult = m_Client.DeleteObject(Key); + if (!DelResult.IsSuccess()) + { + throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + } + }); + } } - if (m_CredentialProvider) - { - Options.CredentialProvider = m_CredentialProvider; - } - else + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator: the only HydrationStrategyBase implementation. + // Holds a per-module StorageBase and threading context; drives the + // hydrate/dehydrate algorithm. + + class IncrementalHydrator : public HydrationStrategyBase { - Options.Credentials = m_Credentials; - } + public: + IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage); + virtual ~IncrementalHydrator() override; - return S3Client(Options); -} + virtual void Dehydrate(const CbObject& CachedState) override; + virtual CbObject Hydrate() override; + virtual void Obliterate() override; -std::string -S3Hydrator::BuildTimestampFolderName() const -{ - UtcTime Now = UtcTime::Now(); - return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); -} + private: + struct Entry + { + std::filesystem::path RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + }; -std::string -S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const -{ - return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); -} + std::unique_ptr<StorageBase> m_Storage; + HydrationConfig m_Config; + WorkerThreadPool m_FallbackWorkPool; + std::atomic<bool> m_FallbackAbortFlag{false}; + std::atomic<bool> m_FallbackPauseFlag{false}; + HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, + .AbortFlag = &m_FallbackAbortFlag, + .PauseFlag = &m_FallbackPauseFlag}; + }; -void -S3Hydrator::Dehydrate() -{ - ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); + /////////////////////////////////////////////////////////////////////// + // IncrementalHydrator implementations - try + IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage) + : m_Storage(std::move(Storage)) + , m_Config(Config) + , m_FallbackWorkPool(0) { - S3Client Client = CreateS3Client(); - std::string FolderName = BuildTimestampFolderName(); - uint64_t TotalBytes = 0; - uint32_t FileCount = 0; - std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now(); + if (Config.Threading) + { + m_Threading = *Config.Threading; + } + } - DirectoryContent DirContent; - GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); + IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } + + void IncrementalHydrator::Dehydrate(const CbObject& CachedState) + { + Stopwatch Timer; - for (const std::filesystem::path& AbsPath : DirContent.Files) + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + try { - std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir); - if (RelPath.empty() || *RelPath.begin() == "..") + std::unordered_map<std::string, size_t> StateEntryLookup; + std::vector<Entry> StateEntries; + for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) { - throw zen::runtime_error( - "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - " - "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)", - AbsPath.string(), - m_Config.ServerStateDir.string()); + CbObjectView EntryView = FieldView.AsObjectView(); + std::filesystem::path RelativePath(EntryView["Path"].AsString()); + uint64_t Size = EntryView["Size"].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"].AsUInt64(); + IoHash Hash = EntryView["Hash"].AsHash(); + + StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); + StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); } - std::string Key = MakeObjectKey(FolderName, RelPath); - BasicFile File(AbsPath, BasicFile::Mode::kRead); - uint64_t FileSize = File.FileSize(); + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + DirContent.Files.size(), + NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); + + std::vector<Entry> Entries; + Entries.resize(DirContent.Files.size()); + + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + uint64_t HashedFiles = 0; + uint64_t HashedBytes = 0; + + std::unordered_set<IoHash> ExistsLookup; - S3Result UploadResult = - Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }); - if (!UploadResult.IsSuccess()) { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); - } + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - TotalBytes += FileSize; - ++FileCount; - } + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + if (AbsPath.filename() == "reserve.gc") + { + continue; + } + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + if (*RelativePath.begin() == ".sentry-native") + { + continue; + } + if (RelativePath == ".lock") + { + continue; + } - // Write current-state.json - int64_t UploadDurationMs = - std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count(); - - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "FolderName" << FolderName; - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadDurationMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "FileCount" << FileCount; - - ExtendableStringBuilder<1024> JsonBuilder; - Meta.Save().ToJson(JsonBuilder); - - std::string MetaKey = m_KeyPrefix + "/current-state.json"; - std::string_view JsonText = JsonBuilder.ToView(); - IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); - S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); - if (!MetaUploadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); - } + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = RelativePath; + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs); - } - catch (std::exception& Ex) - { - // Any in-progress multipart upload has already been aborted by PutObjectMultipart. - // current-state.json is only written on success, so the previous S3 state remains valid. - ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); - } -} + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + { + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; + } + } -void -S3Hydrator::Hydrate() -{ - ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); + if (!FoundHash) + { + Work.ScheduleWork(*m_Threading.WorkerPool, + [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + + Entry& CurrentEntry = Entries[EntryIndex]; + + bool FoundHash = false; + if (AbsPath.extension().empty()) + { + auto It = CurrentEntry.RelativePath.begin(); + if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed( + SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), + RawHash, + RawSize); + if (Compressed) + { + // We compose a meta-hash since taking the RawHash might collide with an + // existing non-compressed file with the same content The collision is + // unlikely except if the compressed data is zero bytes causing RawHash + // to be the same as an empty file. + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); + CurrentEntry.Hash = Hasher.GetHash(); + FoundHash = true; + } + } + } + + if (!FoundHash) + { + CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); + } + }); + HashedFiles++; + HashedBytes += CurrentEntry.Size; + } + TotalFiles++; + TotalBytes += CurrentEntry.Size; + } - const bool ForceRemoveReadOnlyFiles = true; + std::vector<IoHash> ExistingEntries = m_Storage->List(); + ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); - // Clean temp dir before starting in case of leftover state from a previous failed hydration - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + Work.Wait(); - bool WipeServerState = false; + Entries.resize(TotalFiles); + } - try - { - S3Client Client = CreateS3Client(); - std::string MetaKey = m_KeyPrefix + "/current-state.json"; + uint64_t UploadedFiles = 0; + uint64_t UploadedBytes = 0; + uint64_t TouchedFiles = 0; + uint64_t TouchedBytes = 0; + { + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey); - if (HeadResult.Status == HeadObjectResult::NotFound) - { - throw zen::runtime_error("No state found in S3 at '{}'", MetaKey); - } - if (!HeadResult.IsSuccess()) - { - throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error); - } + for (const Entry& CurrentEntry : Entries) + { + if (!ExistsLookup.contains(CurrentEntry.Hash)) + { + m_Storage->Put(Work, + *m_Threading.WorkerPool, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); + UploadedFiles++; + UploadedBytes += CurrentEntry.Size; + } + else + { + // Refresh the backend's modification time so lifecycle-expiration policies + // do not evict CAS entries that are still referenced by this module. + m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash); + TouchedFiles++; + TouchedBytes += CurrentEntry.Size; + } + } - S3GetObjectResult MetaResult = Client.GetObject(MetaKey); - if (!MetaResult.IsSuccess()) - { - throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); - } + Work.Wait(); + uint64_t UploadTimeMs = Timer.GetElapsedTimeMs(); + + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SourceFolder" << ServerStateDir.generic_string(); + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadTimeMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "StorageSettings" << m_Storage->GetSettings(); + + Meta.BeginArray("Files"); + for (const Entry& CurrentEntry : Entries) + { + Meta.BeginObject(); + { + Meta << "Path" << CurrentEntry.RelativePath.generic_string(); + Meta << "Size" << CurrentEntry.Size; + Meta << "ModTick" << CurrentEntry.ModTick; + Meta << "Hash" << CurrentEntry.Hash; + } + Meta.EndObject(); + } + Meta.EndArray(); - std::string ParseError; - json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); - if (!ParseError.empty()) - { - throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); - } + m_Storage->SaveMetadata(Meta.Save()); + } - std::string FolderName = MetaJson["FolderName"].string_value(); - if (FolderName.empty()) - { - throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + + ZEN_INFO( + "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) " + "in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + HashedFiles, + NiceBytes(HashedBytes), + UploadedFiles, + NiceBytes(UploadedBytes), + TouchedFiles, + NiceBytes(TouchedBytes), + TotalFiles, + NiceBytes(TotalBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - - std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; - S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); - if (!ListResult.IsSuccess()) + catch (const std::exception& Ex) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); + ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); } + } - for (const S3ObjectInfo& Obj : ListResult.Objects) + CbObject IncrementalHydrator::Hydrate() + { + Stopwatch Timer; + + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + try { - if (!Obj.Key.starts_with(FolderPrefix)) + CbObject Meta = m_Storage->LoadMetadata(); + if (!Meta) { - ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); - continue; + ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", + m_Config.ModuleId, + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + return CbObject(); } - std::string RelKey = Obj.Key.substr(FolderPrefix.size()); - if (RelKey.empty()) + std::unordered_map<std::string, size_t> EntryLookup; + std::vector<Entry> Entries; + uint64_t TotalSize = 0; + + for (CbFieldView FieldView : Meta["Files"]) { - continue; + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) + { + Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), + .Size = EntryView["Size"].AsUInt64(), + .ModTick = EntryView["ModTick"].AsUInt64(), + .Hash = EntryView["Hash"].AsHash()}; + TotalSize += NewEntry.Size; + EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); + Entries.emplace_back(std::move(NewEntry)); + } } - std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); - CreateDirectories(DestPath.parent_path()); - BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); - DestFile.SetFileSize(Obj.Size); + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + m_Config.ModuleId, + m_Config.ServerStateDir, + Entries.size(), + NiceBytes(TotalSize)); + + m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); + + uint64_t DownloadedBytes = 0; + uint64_t DownloadedFiles = 0; - if (Obj.Size > 0) { - BasicFileWriter Writer(DestFile, 64 * 1024); + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - uint64_t Offset = 0; - while (Offset < Obj.Size) + for (const Entry& CurrentEntry : Entries) { - uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset); - S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); - if (!Chunk.IsSuccess()) + std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); + CreateDirectories(Path.parent_path()); + m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path); + DownloadedBytes += CurrentEntry.Size; + DownloadedFiles++; + } + + Work.Wait(); + } + + // Downloaded successfully - swap into ServerStateDir + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItTmp != TempDir.begin()) + { + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + TempDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); + + for (const std::filesystem::path& AbsPath : DirContent.Directories) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", - Obj.Key, - Offset, - Offset + ChunkSize - 1, - Chunk.Error); + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + } + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); } - - Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); - Offset += ChunkSize; } - Writer.Flush(); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + } + else + { + // Slow path: TempDir and ServerStateDir are on different filesystems, so rename + // would fail. Copy the tree instead and clean up the temp files afterwards. + ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } - } - - // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - // If the two paths share at least one common component they are on the same drive/volume - // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = - std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); - if (ItTmp != m_Config.TempDir.begin()) - { - // Fast path: atomic renames - no data copying needed - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir)) + // TODO: This could perhaps be done more efficently, but ok for now + DirectoryContent DirContent; + GetDirectoryContent(*m_Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { - std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename()); - if (Entry.is_directory()) + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + + if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) { - RenameDirectory(Entry.path(), Dest); + HydrateState.BeginObject(); + { + HydrateState << "Path" << RelativePath.generic_string(); + HydrateState << "Size" << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash" << Entries[It->second].Hash; + } + HydrateState.EndObject(); } else { - RenameFile(Entry.path(), Dest); + ZEN_ASSERT(false); } } - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + HydrateState.EndArray(); + + CbObject StateObject = HydrateState.Save(); + + ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}", + m_Config.ModuleId, + m_Config.ServerStateDir, + DownloadedFiles, + NiceBytes(DownloadedBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return StateObject; } - else + catch (const std::exception& Ex) { - // Slow path: TempDir and ServerStateDir are on different filesystems, so rename - // would fail. Copy the tree instead and clean up the temp files afterwards. - ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); + ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", + m_Config.ModuleId, + Ex.what(), + m_Config.ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + return {}; } - - ZEN_INFO("Hydration complete from folder '{}'", FolderName); } - catch (std::exception& Ex) + + void IncrementalHydrator::Obliterate() { - ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + + try + { + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + m_Storage->Delete(Work, *m_Threading.WorkerPool); + Work.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + } - // We don't do the clean right here to avoid potentially running into double-throws - WipeServerState = true; + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } - if (WipeServerState) +} // namespace hydration_impl + +/////////////////////////////////////////////////////////////////////////// +// HydrationBase subclasses - own hub-wide backend state, hand per-module +// storages the exact inputs they need in CreateHydrator. + +class FileHydration : public HydrationBase +{ +public: + explicit FileHydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::filesystem::path m_StorageRoot; +}; + +class S3Hydration : public HydrationBase +{ +public: + explicit S3Hydration(const Configuration& Config); + + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override; + +private: + std::string m_Bucket; + std::string m_Region; + std::string m_Endpoint; + bool m_PathStyle = false; + std::string m_KeyPrefixRoot; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; + std::unique_ptr<S3Client> m_Client; + uint64_t m_DefaultMultipartChunkSize; +}; + +/////////////////////////////////////////////////////////////////////////// +// Implementations + +FileHydration::FileHydration(const Configuration& Config) +{ + if (!Config.TargetSpecification.empty()) + { + m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); + if (m_StorageRoot.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires a directory path"); + } + } + else { - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); - CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Path = Settings["path"].AsString(); + if (Path.empty()) + { + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + } + m_StorageRoot = Utf8ToWide(std::string(Path)); } + MakeSafeAbsolutePathInPlace(m_StorageRoot); } std::unique_ptr<HydrationStrategyBase> -CreateHydrator(const HydrationConfig& Config) +FileHydration::CreateHydrator(const HydrationConfig& Config) { - if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) + using namespace hydration_impl; + return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId)); +} + +S3Hydration::S3Hydration(const Configuration& Config) +{ + using namespace hydration_impl; + + CbObjectView Settings = Config.Options["settings"].AsObjectView(); + std::string_view Spec; + if (!Config.TargetSpecification.empty()) { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>(); - Hydrator->Configure(Config); - return Hydrator; + Spec = Config.TargetSpecification; + Spec.remove_prefix(S3Storage::Prefix.size()); } - if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + else { - std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); - Hydrator->Configure(Config); - return Hydrator; + std::string_view Uri = Settings["uri"].AsString(); + if (Uri.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); + } + Spec = Uri; + Spec.remove_prefix(S3Storage::Prefix.size()); } - throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); -} -#if ZEN_WITH_TESTS + size_t SlashPos = Spec.find('/'); + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; -namespace { + if (m_Bucket.empty()) + { + throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); + } + + std::string Region = std::string(Settings["region"].AsString()); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_DEFAULT_REGION"); + } + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; + } + m_Region = std::move(Region); + + std::string_view Endpoint = Settings["endpoint"].AsString(); + if (!Endpoint.empty()) + { + m_Endpoint = std::string(Endpoint); + m_PathStyle = Settings["path-style"].AsBool(); + } + + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + } + + m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); - /// Scoped RAII helper to set/restore a single environment variable within a test. - /// Used to configure AWS credentials for each S3 test's MinIO instance - /// without polluting the global environment. - struct ScopedEnvVar + S3ClientOptions ClientOptions; + ClientOptions.BucketName = m_Bucket; + ClientOptions.Region = m_Region; + ClientOptions.Endpoint = m_Endpoint; + ClientOptions.PathStyle = m_PathStyle; + if (m_CredentialProvider) { - std::string m_Name; - std::optional<std::string> m_OldValue; // nullopt = was not set; "" = was set to empty string + ClientOptions.CredentialProvider = m_CredentialProvider; + } + else + { + ClientOptions.Credentials = m_Credentials; + } + ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + + m_Client = std::make_unique<S3Client>(ClientOptions); +} + +std::unique_ptr<HydrationStrategyBase> +S3Hydration::CreateHydrator(const HydrationConfig& Config) +{ + using namespace hydration_impl; + std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); + return std::make_unique<IncrementalHydrator>( + Config, + std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); +} - ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name) +std::unique_ptr<HydrationBase> +InitHydration(const HydrationBase::Configuration& Config) +{ + using namespace hydration_impl; + + if (!Config.TargetSpecification.empty()) + { + if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0) { -# if ZEN_PLATFORM_WINDOWS - // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND) - // from "set to empty string" (returns 0 with no error). - char Buf[1]; - DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf)); - if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND) - { - m_OldValue = std::nullopt; - } - else - { - // Len == 0 with no error: variable exists but is empty. - // Len > sizeof(Buf): value is non-empty; Len is the required buffer size - // (including null terminator) - allocate and re-read. - std::string Old(Len == 0 ? 0 : Len - 1, '\0'); - if (Len > sizeof(Buf)) - { - GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len); - } - m_OldValue = std::move(Old); - } - SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str()); -# else - // getenv returns nullptr when not set, "" when set to empty string. - const char* Existing = getenv(m_Name.c_str()); - m_OldValue = Existing ? std::optional<std::string>(Existing) : std::nullopt; - setenv(m_Name.c_str(), std::string(Value).c_str(), 1); -# endif + return std::make_unique<FileHydration>(Config); } - ~ScopedEnvVar() + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0) { -# if ZEN_PLATFORM_WINDOWS - SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr); -# else - if (m_OldValue.has_value()) - { - setenv(m_Name.c_str(), m_OldValue->c_str(), 1); - } - else - { - unsetenv(m_Name.c_str()); - } -# endif + return std::make_unique<S3Hydration>(Config); } + throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); + } + + std::string_view Type = Config.Options["type"].AsString(); + if (Type == FileStorage::Type) + { + return std::make_unique<FileHydration>(Config); + } + if (Type == S3Storage::Type) + { + return std::make_unique<S3Hydration>(Config); + } + if (!Type.empty()) + { + throw zen::runtime_error("Unknown hydration target type '{}'", Type); + } + throw zen::runtime_error("No hydration target configured"); +} + +#if ZEN_WITH_TESTS + +namespace { + + struct TestThreading + { + WorkerThreadPool WorkerPool; + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; + + explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} }; /// Create a small file hierarchy under BaseDir: @@ -593,10 +1222,10 @@ namespace { /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. - std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir) - { - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + typedef std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFileList; + TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) + { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); @@ -607,6 +1236,36 @@ namespace { AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); + AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); + + return Files; + } + + TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + return Files; + } + + TestFileList CreateTestTree(const std::filesystem::path& BaseDir) + { + TestFileList Files; + AddTestFiles(BaseDir, Files); + + auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { + std::filesystem::path FullPath = BaseDir / RelPath; + CreateDirectories(FullPath.parent_path()); + WriteFile(FullPath, Content); + Files.emplace_back(std::move(RelPath), std::move(Content)); + }; + + AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); + AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); + AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); + AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } @@ -644,35 +1303,27 @@ TEST_CASE("hydration.file.dehydrate_hydrate") CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "file://" + HydrationStore.string(); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate: copy server state to file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.dehydrate_cleans_server_state") +TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; @@ -683,22 +1334,25 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; - // FileHydrator::Dehydrate() must wipe ServerStateDir when done - CHECK(std::filesystem::is_empty(ServerStateDir)); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + // Put a stale file in ServerStateDir to simulate leftover state + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + + // Hydrate - must wipe stale file and restore original + Hydration->CreateHydrator(Config)->Hydrate(); + + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + VerifyTree(ServerStateDir, TestFiles); } -TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +TEST_CASE("hydration.file.excluded_files_not_dehydrated") { ScopedTemporaryDirectory TempDir; @@ -709,31 +1363,70 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - auto TestFiles = CreateTestTree(ServerStateDir); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "testmodule"; - Config.TargetSpecification = "file://" + HydrationStore.string(); + // Add files that the dehydrator should skip + WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); + CreateDirectories(ServerStateDir / ".sentry-native"); + WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); + WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); - // Dehydrate the original state - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - // Put a stale file in ServerStateDir to simulate leftover state - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; - // Hydrate - must wipe stale file and restore original - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); + // Hydrate into a clean directory + CleanDirectory(ServerStateDir, true); + Hydration->CreateHydrator(Config)->Hydrate(); + + // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); + // Excluded files must NOT be restored + CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); + CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); +} + +// --------------------------------------------------------------------------- +// FileHydrator obliterate test +// --------------------------------------------------------------------------- + +TEST_CASE("hydration.file.obliterate") +{ + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "obliterate_test"; + CreateSmallTestTree(ServerStateDir); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + + // Dehydrate so the backend store has data + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(HydrationStore / ModuleId)); + + // Put some files back in ServerStateDir and TempDir to verify cleanup + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + // Obliterate + Hydration->CreateHydrator(Config)->Obliterate(); + + // Backend store directory deleted + CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); + // ServerStateDir cleaned + CHECK(std::filesystem::is_empty(ServerStateDir)); + // TempDir cleaned + CHECK(std::filesystem::is_empty(HydrationTemp)); } // --------------------------------------------------------------------------- @@ -750,6 +1443,8 @@ TEST_CASE("hydration.file.concurrent") std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); + TestThreading Threading(8); + struct ModuleData { HydrationConfig Config; @@ -757,6 +1452,8 @@ TEST_CASE("hydration.file.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); @@ -765,11 +1462,11 @@ TEST_CASE("hydration.file.concurrent") CreateDirectories(StateDir); CreateDirectories(TempPath); - Modules[I].Config.ServerStateDir = StateDir; - Modules[I].Config.TempDir = TempPath; - Modules[I].Config.ModuleId = ModuleId; - Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate @@ -781,9 +1478,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -799,9 +1495,8 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -818,14 +1513,14 @@ TEST_CASE("hydration.file.concurrent") // --------------------------------------------------------------------------- // S3Hydrator tests // -// Each test case spawns its own local MinIO instance (self-contained, no external setup needed). +// Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19010; + MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -840,168 +1535,45 @@ TEST_CASE("hydration.s3.dehydrate_hydrate") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_roundtrip"; - auto TestFiles = CreateTestTree(ServerStateDir); - - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; - - // Dehydrate: upload server state to MinIO - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } - - // Wipe server state - CleanDirectory(ServerStateDir, true); - CHECK(std::filesystem::is_empty(ServerStateDir)); - - // Hydrate: download from MinIO back to server state + HydrationBase::Configuration BaseConfig; { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - // Verify restored contents match the original - VerifyTree(ServerStateDir, TestFiles); -} - -TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") -{ - // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites - // current-state.json to point at that folder. Old folders are NOT deleted. - // Hydrate() must read current-state.json to determine which folder to restore from. - // - // This test verifies that: - // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first, - // confirming that current-state.json was updated between dehydrations. - // 2. current-state.json is updated to point at the second (latest) folder. - // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot. - - MinioProcessOptions MinioOpts; - MinioOpts.Port = 19011; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationTemp); - - const std::string ModuleId = "s3test_folder_select"; + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"}; - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = ModuleId; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir + // with a stale file to confirm the cleanup branch wipes it. + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + Hydration->CreateHydrator(Config)->Hydrate(); + CHECK(std::filesystem::is_empty(ServerStateDir)); // v1: dehydrate without a marker file - CreateTestTree(ServerStateDir); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } - - // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later - // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin). - Sleep(100); + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // v2: dehydrate WITH a marker file that only v2 has - CreateTestTree(ServerStateDir); + CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); - } + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - // Hydrate must restore v2 (current-state.json points to the v2 folder) + // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); - { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); - } + Hydration->CreateHydrator(Config)->Hydrate(); - // v2 marker must be present - confirms current-state.json pointed to the v2 folder + // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); - // Subdirectory hierarchy must also be intact CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } -TEST_CASE("hydration.s3.module_isolation") -{ - // Two independent modules dehydrate/hydrate without interfering with each other. - // Uses VerifyTree with per-module byte content to detect cross-module data mixing. - MinioProcessOptions MinioOpts; - MinioOpts.Port = 19012; - MinioProcess Minio(MinioOpts); - Minio.SpawnMinioServer(); - Minio.CreateBucket("zen-hydration-test"); - - ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); - ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - - ScopedTemporaryDirectory TempDir; - - struct ModuleData - { - HydrationConfig Config; - std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; - }; - - std::vector<ModuleData> Modules; - for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"}) - { - std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; - std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; - CreateDirectories(StateDir); - CreateDirectories(TempPath); - - ModuleData Data; - Data.Config.ServerStateDir = StateDir; - Data.Config.TempDir = TempPath; - Data.Config.ModuleId = ModuleId; - Data.Config.TargetSpecification = "s3://zen-hydration-test"; - Data.Config.S3Endpoint = Minio.Endpoint(); - Data.Config.S3PathStyle = true; - Data.Files = CreateTestTree(StateDir); - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config); - Hydrator->Dehydrate(); - - Modules.push_back(std::move(Data)); - } - - for (ModuleData& Module : Modules) - { - CleanDirectory(Module.Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config); - Hydrator->Hydrate(); - - // Each module's files must be independently restorable with correct byte content. - // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ. - VerifyTree(Module.Config.ServerStateDir, Module.Files); - } -} - -// --------------------------------------------------------------------------- -// S3Hydrator concurrent test -// --------------------------------------------------------------------------- - TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. @@ -1015,7 +1587,10 @@ TEST_CASE("hydration.s3.concurrent") ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - constexpr int kModuleCount = 4; + constexpr int kModuleCount = 6; + constexpr int kThreadCount = 4; + + TestThreading Threading(kThreadCount); ScopedTemporaryDirectory TempDir; @@ -1026,6 +1601,18 @@ TEST_CASE("hydration.s3.concurrent") }; std::vector<ModuleData> Modules(kModuleCount); + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); @@ -1034,27 +1621,24 @@ TEST_CASE("hydration.s3.concurrent") CreateDirectories(StateDir); CreateDirectories(TempPath); - Modules[I].Config.ServerStateDir = StateDir; - Modules[I].Config.TempDir = TempPath; - Modules[I].Config.ModuleId = ModuleId; - Modules[I].Config.TargetSpecification = "s3://zen-hydration-test"; - Modules[I].Config.S3Endpoint = Minio.Endpoint(); - Modules[I].Config.S3PathStyle = true; - Modules[I].Files = CreateTestTree(StateDir); + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { - WorkerThreadPool Pool(kModuleCount, "hydration_s3_dehy"); + WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); @@ -1063,17 +1647,16 @@ TEST_CASE("hydration.s3.concurrent") // Concurrent hydrate { - WorkerThreadPool Pool(kModuleCount, "hydration_s3_hy"); + WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { - Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) { + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { CleanDirectory(Config.ServerStateDir, true); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); @@ -1087,17 +1670,76 @@ TEST_CASE("hydration.s3.concurrent") } } -// --------------------------------------------------------------------------- -// S3Hydrator: no prior state (first-boot path) -// --------------------------------------------------------------------------- +TEST_CASE("hydration.s3.obliterate") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = 19019; + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); -TEST_CASE("hydration.s3.no_prior_state") + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "s3test_obliterate"; + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + + // Dehydrate to populate backend + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + auto ListModuleObjects = [&]() { + S3ClientOptions Opts; + Opts.BucketName = "zen-hydration-test"; + Opts.Endpoint = Minio.Endpoint(); + Opts.PathStyle = true; + Opts.Credentials.AccessKeyId = Minio.RootUser(); + Opts.Credentials.SecretAccessKey = Minio.RootPassword(); + S3Client Client(Opts); + return Client.ListObjects(ModuleId + "/"); + }; + + // Verify objects exist in S3 + CHECK(!ListModuleObjects().Objects.empty()); + + // Re-populate ServerStateDir and TempDir for cleanup verification + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + // Obliterate + Hydration->CreateHydrator(Config)->Obliterate(); + + // Verify S3 objects deleted + CHECK(ListModuleObjects().Objects.empty()); + // Local directories cleaned + CHECK(std::filesystem::is_empty(ServerStateDir)); + CHECK(std::filesystem::is_empty(HydrationTemp)); +} + +TEST_CASE("hydration.s3.config_overrides") { - // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty. - // The "No state found in S3" path goes through the error-cleanup branch, which wipes - // ServerStateDir to ensure no partial or stale content is left for the server to start on. MinioProcessOptions MinioOpts; - MinioOpts.Port = 19014; + MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -1112,36 +1754,202 @@ TEST_CASE("hydration.s3.no_prior_state") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - // Pre-populate ServerStateDir to confirm the wipe actually runs. - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + // Path prefix: "s3://bucket/some/prefix" stores objects under + // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". + { + auto TestFiles = CreateSmallTestTree(ServerStateDir); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"}; - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_no_prior"; - Config.TargetSpecification = "s3://zen-hydration-test"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + CleanDirectory(ServerStateDir, true); + + Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(ServerStateDir, TestFiles); + } + + // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. + // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. + { + CleanDirectory(ServerStateDir, true); + auto TestFiles = CreateSmallTestTree(ServerStateDir); - // ServerStateDir must be empty: the error path wipes it to prevent a server start - // against stale or partially-installed content. + ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"}; + + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(ServerStateDir, true); + + Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(ServerStateDir, TestFiles); + } +} + +TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = 19010; + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "s3test_performance"; + CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; + + // Dehydrate: upload server state to MinIO + ZEN_INFO("============== DEHYDRATE =============="); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + for (size_t I = 0; I < 1; I++) + { + // Wipe server state + CleanDirectory(ServerStateDir, true); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: download from MinIO back to server state + ZEN_INFO("=============== HYDRATE ==============="); + Hydration->CreateHydrator(Config)->Hydrate(); + } +} + +//#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" +//#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" + +TEST_CASE("hydration.file.incremental") +{ + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "testmodule"; + // auto TestFiles = CreateTestTree(ServerStateDir); + + TestThreading Threading(4); + + auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; + + // Hydrate with no prior state + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); + +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: restore from file store + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } // --------------------------------------------------------------------------- -// S3Hydrator: bucket path prefix in TargetSpecification +// S3Storage test // --------------------------------------------------------------------------- -TEST_CASE("hydration.s3.path_prefix") +TEST_CASE("hydration.s3.incremental") { - // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under - // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...". - // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure(). MinioProcessOptions MinioOpts; - MinioOpts.Port = 19015; + MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -1149,36 +1957,95 @@ TEST_CASE("hydration.s3.path_prefix") ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - ScopedTemporaryDirectory TempDir; + std::filesystem::path TmpPath; +# ifdef REAL_DATA_PATH + TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; +# endif + ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir); + const std::string ModuleId = "s3test_incremental"; - HydrationConfig Config; - Config.ServerStateDir = ServerStateDir; - Config.TempDir = HydrationTemp; - Config.ModuleId = "s3test_prefix"; - Config.TargetSpecification = "s3://zen-hydration-test/team/project"; - Config.S3Endpoint = Minio.Endpoint(); - Config.S3PathStyle = true; + TestThreading Threading(8); + HydrationBase::Configuration BaseConfig; { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Dehydrate(); + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); } + auto Hydration = InitHydration(BaseConfig); - CleanDirectory(ServerStateDir, true); + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = ModuleId, + .Threading = Threading.Options}; + + // Hydrate with no prior state + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); + +# ifdef REAL_DATA_PATH + ZEN_INFO("Writing state data..."); + CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); + ZEN_INFO("Writing state data complete"); +# else + // Create test files and dehydrate + auto TestFiles = CreateTestTree(ServerStateDir); +# endif + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + // Hydrate: restore from S3 + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif + // Dehydrate again with cached state (should skip re-uploading unchanged files) + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + + // Replace files and dehydrate + TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + + // Hydrate one more time to confirm second dehydrate produced valid state + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + +# ifndef REAL_DATA_PATH + VerifyTree(ServerStateDir, TestFiles); +# endif // 0 + + // Dehydrate, nothing touched - no hashing, no upload + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); +} + +TEST_CASE("hydration.create_hydrator_rejects_invalid_config") +{ + // Unknown TargetSpecification prefix + CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"})); + + // Unknown Options type { - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - Hydrator->Hydrate(); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()})); } - VerifyTree(ServerStateDir, TestFiles); + // Empty Options (no type field) + CHECK_THROWS(InitHydration({})); } TEST_SUITE_END(); diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index d29ffe5c0..0455dda91 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -2,10 +2,18 @@ #pragma once +#include <zencore/compactbinary.h> + +#include <atomic> #include <filesystem> +#include <memory> +#include <optional> +#include <string> namespace zen { +class WorkerThreadPool; + struct HydrationConfig { // Location of server state to hydrate/dehydrate @@ -14,14 +22,16 @@ struct HydrationConfig std::filesystem::path TempDir; // Module ID of the server state being hydrated/dehydrated std::string ModuleId; - // Back-end specific target specification (e.g. S3 bucket, file path, etc) - std::string TargetSpecification; - - // Optional S3 endpoint override (e.g. "http://localhost:9000" for MinIO). - std::string S3Endpoint; - // Use path-style S3 URLs (endpoint/bucket/key) instead of virtual-hosted-style - // (bucket.endpoint/key). Required for MinIO and other non-AWS endpoints. - bool S3PathStyle = false; + + struct ThreadingOptions + { + WorkerThreadPool* WorkerPool = nullptr; + std::atomic<bool>* AbortFlag = nullptr; + std::atomic<bool>* PauseFlag = nullptr; + }; + + // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread. + std::optional<ThreadingOptions> Threading; }; /** @@ -30,18 +40,53 @@ struct HydrationConfig * An instance of this interface is used to perform hydration OR * dehydration of server state. It's expected to be used only once * and not reused. - * */ struct HydrationStrategyBase { virtual ~HydrationStrategyBase() = default; - virtual void Dehydrate() = 0; - virtual void Hydrate() = 0; - virtual void Configure(const HydrationConfig& Config) = 0; + // Upload server state to the configured target. ServerStateDir is wiped on success. + // On failure, ServerStateDir is left intact. + virtual void Dehydrate(const CbObject& CachedState) = 0; + + // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate. + // On failure, ServerStateDir is wiped and an empty CbObject is returned. + virtual CbObject Hydrate() = 0; + + // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir. + virtual void Obliterate() = 0; +}; + +/** + * @brief Hub-wide hydration backend + * + * Constructed once per hub via InitHydration. Holds the shared connection / client / + * credentials state for the configured backend (e.g. a single S3 client and IMDS + * credential provider shared by all modules). CreateHydrator produces a ready-to-use + * per-module HydrationStrategyBase that references the shared state - no per-module + * backend setup cost. + */ +class HydrationBase +{ +public: + struct Configuration + { + // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path") + std::string TargetSpecification; + // Full config object (mutually exclusive with TargetSpecification) + CbObject Options; + }; + + virtual ~HydrationBase() = default; + + // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate. + virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0; }; -std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config); +// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration). +// Throws zen::runtime_error if the config cannot be resolved to a known backend or if +// backend-specific validation fails. +std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config); #if ZEN_WITH_TESTS void hydration_forcelink(); diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 6b139dbf1..97edc5223 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -11,13 +11,15 @@ namespace zen { -StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId) -: m_Config(Config) +StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, + HydrationBase& Hydration, + const Configuration& Config, + std::string_view ModuleId) +: m_Hydration(Hydration) +, m_Config(Config) , m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) { - m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); - m_TempDir = Config.HydrationTempPath / ModuleId; } StorageServerInstance::~StorageServerInstance() @@ -31,7 +33,7 @@ StorageServerInstance::SpawnServerProcess() m_ServerInstance.ResetDeadProcess(); m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); - m_ServerInstance.SetDataDir(m_BaseDir); + m_ServerInstance.SetDataDir(m_Config.StateDir); #if ZEN_PLATFORM_WINDOWS m_ServerInstance.SetJobObject(m_JobObject); #endif @@ -50,6 +52,36 @@ StorageServerInstance::SpawnServerProcess() { AdditionalOptions << " --config=\"" << MakeSafeAbsolutePath(m_Config.ConfigPath).string() << "\""; } + if (!m_Config.Malloc.empty()) + { + AdditionalOptions << " --malloc=" << m_Config.Malloc; + } + if (!m_Config.Trace.empty()) + { + AdditionalOptions << " --trace=" << m_Config.Trace; + } + if (!m_Config.TraceHost.empty()) + { + AdditionalOptions << " --tracehost=" << m_Config.TraceHost; + } + if (!m_Config.TraceFile.empty()) + { + constexpr std::string_view ModuleIdPattern = "{moduleid}"; + constexpr std::string_view PortPattern = "{port}"; + + std::string ResolvedTraceFile = m_Config.TraceFile; + for (size_t Pos = ResolvedTraceFile.find(ModuleIdPattern); Pos != std::string::npos; + Pos = ResolvedTraceFile.find(ModuleIdPattern, Pos)) + { + ResolvedTraceFile.replace(Pos, ModuleIdPattern.length(), m_ModuleId); + } + std::string PortStr = fmt::format("{}", m_Config.BasePort); + for (size_t Pos = ResolvedTraceFile.find(PortPattern); Pos != std::string::npos; Pos = ResolvedTraceFile.find(PortPattern, Pos)) + { + ResolvedTraceFile.replace(Pos, PortPattern.length(), PortStr); + } + AdditionalOptions << " --tracefile=\"" << ResolvedTraceFile << "\""; + } m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView()); ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort); @@ -57,16 +89,15 @@ StorageServerInstance::SpawnServerProcess() m_ServerInstance.EnableShutdownOnDestroy(); } -void -StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const +ProcessMetrics +StorageServerInstance::GetProcessMetrics() const { - OutMetrics.MemoryBytes = m_MemoryBytes.load(); - OutMetrics.KernelTimeMs = m_KernelTimeMs.load(); - OutMetrics.UserTimeMs = m_UserTimeMs.load(); - OutMetrics.WorkingSetSize = m_WorkingSetSize.load(); - OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load(); - OutMetrics.PagefileUsage = m_PagefileUsage.load(); - OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); + ProcessMetrics Metrics; + if (m_ServerInstance.IsRunning()) + { + zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); + } + return Metrics; } void @@ -78,7 +109,7 @@ StorageServerInstance::ProvisionLocked() return; } - ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); + ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir); try { Hydrate(); @@ -88,7 +119,7 @@ StorageServerInstance::ProvisionLocked() { ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during provisioning. Reason: {}", m_ModuleId, - m_BaseDir, + m_Config.StateDir, Ex.what()); throw; } @@ -118,6 +149,22 @@ StorageServerInstance::DeprovisionLocked() } void +StorageServerInstance::ObliterateLocked() +{ + if (m_ServerInstance.IsRunning()) + { + // m_ServerInstance.Shutdown() never throws. + m_ServerInstance.Shutdown(); + } + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); + Hydrator->Obliterate(); +} + +void StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake @@ -147,7 +194,10 @@ StorageServerInstance::WakeLocked() } catch (const std::exception& Ex) { - ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", m_ModuleId, m_BaseDir, Ex.what()); + ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", + m_ModuleId, + m_Config.StateDir, + Ex.what()); throw; } } @@ -155,27 +205,34 @@ StorageServerInstance::WakeLocked() void StorageServerInstance::Hydrate() { - HydrationConfig Config{.ServerStateDir = m_BaseDir, - .TempDir = m_TempDir, - .ModuleId = m_ModuleId, - .TargetSpecification = m_Config.HydrationTargetSpecification}; - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); - - Hydrator->Hydrate(); + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); + m_HydrationState = Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { - HydrationConfig Config{.ServerStateDir = m_BaseDir, - .TempDir = m_TempDir, - .ModuleId = m_ModuleId, - .TargetSpecification = m_Config.HydrationTargetSpecification}; + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config); + Hydrator->Dehydrate(m_HydrationState); +} - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); +HydrationConfig +StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +{ + HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId}; + if (m_Config.OptionalWorkerPool) + { + Config.Threading.emplace( + HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); + } - Hydrator->Dehydrate(); + return Config; } StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) @@ -249,25 +306,6 @@ StorageServerInstance::SharedLockedPtr::IsRunning() const return m_Instance->m_ServerInstance.IsRunning(); } -void -StorageServerInstance::UpdateMetricsLocked() -{ - if (m_ServerInstance.IsRunning()) - { - ProcessMetrics Metrics; - zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); - - m_MemoryBytes.store(Metrics.MemoryBytes); - m_KernelTimeMs.store(Metrics.KernelTimeMs); - m_UserTimeMs.store(Metrics.UserTimeMs); - m_WorkingSetSize.store(Metrics.WorkingSetSize); - m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); - m_PagefileUsage.store(Metrics.PagefileUsage); - m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage); - } - // TODO: Resource metrics... -} - #if ZEN_WITH_TESTS void StorageServerInstance::SharedLockedPtr::TerminateForTesting() const @@ -363,6 +401,13 @@ StorageServerInstance::ExclusiveLockedPtr::Deprovision() } void +StorageServerInstance::ExclusiveLockedPtr::Obliterate() +{ + ZEN_ASSERT(m_Instance != nullptr); + m_Instance->ObliterateLocked(); +} + +void StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 94c47630c..c5917afc9 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -2,8 +2,9 @@ #pragma once -#include "resourcemetrics.h" +#include "hydration.h" +#include <zencore/compactbinary.h> #include <zenutil/zenserverprocess.h> #include <atomic> @@ -11,6 +12,8 @@ namespace zen { +class WorkerThreadPool; + /** * Storage Server Instance * @@ -24,21 +27,28 @@ public: struct Configuration { uint16_t BasePort; - std::filesystem::path HydrationTempPath; - std::string HydrationTargetSpecification; + std::filesystem::path StateDir; + std::filesystem::path TempDir; uint32_t HttpThreadCount = 0; // Automatic int CoreLimit = 0; // Automatic std::filesystem::path ConfigPath; + std::string Malloc; + std::string Trace; + std::string TraceHost; + std::string TraceFile; + + WorkerThreadPool* OptionalWorkerPool = nullptr; }; - StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId); + StorageServerInstance(ZenServerEnvironment& RunEnvironment, + HydrationBase& Hydration, + const Configuration& Config, + std::string_view ModuleId); ~StorageServerInstance(); - const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } - inline std::string_view GetModuleId() const { return m_ModuleId; } inline uint16_t GetBasePort() const { return m_Config.BasePort; } - void GetProcessMetrics(ProcessMetrics& OutMetrics) const; + ProcessMetrics GetProcessMetrics() const; #if ZEN_PLATFORM_WINDOWS void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; } @@ -68,15 +78,10 @@ public: } bool IsRunning() const; - const ResourceMetrics& GetResourceMetrics() const + ProcessMetrics GetProcessMetrics() const { ZEN_ASSERT(m_Instance); - return m_Instance->m_ResourceMetrics; - } - void UpdateMetrics() - { - ZEN_ASSERT(m_Instance); - return m_Instance->UpdateMetricsLocked(); + return m_Instance->GetProcessMetrics(); } #if ZEN_WITH_TESTS @@ -114,14 +119,9 @@ public: } bool IsRunning() const; - const ResourceMetrics& GetResourceMetrics() const - { - ZEN_ASSERT(m_Instance); - return m_Instance->m_ResourceMetrics; - } - void Provision(); void Deprovision(); + void Obliterate(); void Hibernate(); void Wake(); @@ -135,29 +135,18 @@ public: private: void ProvisionLocked(); void DeprovisionLocked(); + void ObliterateLocked(); void HibernateLocked(); void WakeLocked(); - void UpdateMetricsLocked(); - mutable RwLock m_Lock; + HydrationBase& m_Hydration; const Configuration m_Config; std::string m_ModuleId; ZenServerInstance m_ServerInstance; - std::filesystem::path m_BaseDir; - - std::filesystem::path m_TempDir; - ResourceMetrics m_ResourceMetrics; - - std::atomic<uint64_t> m_MemoryBytes = 0; - std::atomic<uint64_t> m_KernelTimeMs = 0; - std::atomic<uint64_t> m_UserTimeMs = 0; - std::atomic<uint64_t> m_WorkingSetSize = 0; - std::atomic<uint64_t> m_PeakWorkingSetSize = 0; - std::atomic<uint64_t> m_PagefileUsage = 0; - std::atomic<uint64_t> m_PeakPagefileUsage = 0; + CbObject m_HydrationState; #if ZEN_PLATFORM_WINDOWS JobObject* m_JobObject = nullptr; @@ -165,8 +154,9 @@ private: void SpawnServerProcess(); - void Hydrate(); - void Dehydrate(); + void Hydrate(); + void Dehydrate(); + HydrationConfig MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); friend class SharedLockedPtr; friend class ExclusiveLockedPtr; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 314031246..ebc2cf2f1 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -2,21 +2,29 @@ #include "zenhubserver.h" +#include "config/luaconfig.h" #include "frontend/frontend.h" #include "httphubservice.h" +#include "httpproxyhandler.h" #include "hub.h" +#include <zencore/compactbinary.h> #include <zencore/config.h> +#include <zencore/except.h> +#include <zencore/except_fmt.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/intmath.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> #include <zencore/memory/tagtrace.h> #include <zencore/scopeguard.h> #include <zencore/sentryintegration.h> +#include <zencore/system.h> +#include <zencore/thread.h> #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> -#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cxxopts.hpp> @@ -53,12 +61,19 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", "instance-id", - "Instance ID for use in notifications", + "Instance ID for use in notifications (deprecated, use --upstream-notification-instance-id)", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); Options.add_option("hub", "", + "upstream-notification-instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId), + ""); + + Options.add_option("hub", + "", "consul-endpoint", "Consul endpoint URL for service registration (empty = disabled)", cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), @@ -88,13 +103,27 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "consul-register-hub", + "Register the hub parent service with Consul (instance registration is unaffected)", + cxxopts::value<bool>(m_ServerOptions.ConsulRegisterHub)->default_value("true"), + ""); + + Options.add_option("hub", + "", "hub-base-port-number", - "Base port number for provisioned instances", + "Base port number for provisioned instances (deprecated, use --hub-instance-base-port-number)", cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), ""); Options.add_option("hub", "", + "hub-instance-base-port-number", + "Base port number for provisioned instances", + cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber), + ""); + + Options.add_option("hub", + "", "hub-instance-limit", "Maximum number of provisioned instances for this hub", cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"), @@ -113,6 +142,34 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "hub-instance-malloc", + "Select memory allocator for provisioned instances (ansi|stomp|rpmalloc|mimalloc)", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceMalloc)->default_value(""), + "<allocator>"); + + Options.add_option("hub", + "", + "hub-instance-trace", + "Trace channel specification for provisioned instances (e.g. default, cpu,log, memory)", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTrace)->default_value(""), + "<channels>"); + + Options.add_option("hub", + "", + "hub-instance-tracehost", + "Trace host for provisioned instances", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceHost)->default_value(""), + "<host>"); + + Options.add_option("hub", + "", + "hub-instance-tracefile", + "Trace file path for provisioned instances", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceFile)->default_value(""), + "<path>"); + + Options.add_option("hub", + "", "hub-instance-http-threads", "Number of http server connection threads for provisioned instances", cxxopts::value<unsigned int>(m_ServerOptions.HubInstanceHttpThreadCount), @@ -131,6 +188,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HubInstanceConfigPath), "<instance config>"); + const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option("hub", + "", + "hub-instance-provision-threads", + fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount) + ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)), + "<threads>"); + Options.add_option("hub", "", "hub-hydration-target-spec", @@ -139,6 +206,24 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HydrationTargetSpecification), "<hydration-target-spec>"); + Options.add_option("hub", + "", + "hub-hydration-target-config", + "Path to JSON file specifying the hydration target (mutually exclusive with " + "--hub-hydration-target-spec). Supported types: 'file', 's3'.", + cxxopts::value(m_ServerOptions.HydrationTargetConfigPath), + "<path>"); + + const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option( + "hub", + "", + "hub-hydration-threads", + fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)), + "<threads>"); + #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", @@ -203,12 +288,112 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Request timeout in milliseconds for instance activity check requests", cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"), "<ms>"); + + Options.add_option("hub", + "", + "hub-provision-disk-limit-bytes", + "Reject provisioning when used disk bytes exceed this value (0 = no limit).", + cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionDiskLimitBytes), + "<bytes>"); + + Options.add_option("hub", + "", + "hub-provision-disk-limit-percent", + "Reject provisioning when used disk exceeds this percentage of total disk (0 = no limit).", + cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionDiskLimitPercent), + "<percent>"); + + Options.add_option("hub", + "", + "hub-provision-memory-limit-bytes", + "Reject provisioning when used memory bytes exceed this value (0 = no limit).", + cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionMemoryLimitBytes), + "<bytes>"); + + Options.add_option("hub", + "", + "hub-provision-memory-limit-percent", + "Reject provisioning when used memory exceeds this percentage of total RAM (0 = no limit).", + cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionMemoryLimitPercent), + "<percent>"); } void ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { - ZEN_UNUSED(Options); + using namespace std::literals; + + Options.AddOption("hub.upstreamnotification.endpoint"sv, + m_ServerOptions.UpstreamNotificationEndpoint, + "upstream-notification-endpoint"sv); + Options.AddOption("hub.upstreamnotification.instanceid"sv, m_ServerOptions.InstanceId, "upstream-notification-instance-id"sv); + + Options.AddOption("hub.consul.endpoint"sv, m_ServerOptions.ConsulEndpoint, "consul-endpoint"sv); + Options.AddOption("hub.consul.tokenenv"sv, m_ServerOptions.ConsulTokenEnv, "consul-token-env"sv); + Options.AddOption("hub.consul.healthintervalseconds"sv, + m_ServerOptions.ConsulHealthIntervalSeconds, + "consul-health-interval-seconds"sv); + Options.AddOption("hub.consul.deregisterafterseconds"sv, + m_ServerOptions.ConsulDeregisterAfterSeconds, + "consul-deregister-after-seconds"sv); + Options.AddOption("hub.consul.registerhub"sv, m_ServerOptions.ConsulRegisterHub, "consul-register-hub"sv); + + Options.AddOption("hub.instance.baseportnumber"sv, m_ServerOptions.HubBasePortNumber, "hub-instance-base-port-number"sv); + Options.AddOption("hub.instance.http"sv, m_ServerOptions.HubInstanceHttpClass, "hub-instance-http"sv); + Options.AddOption("hub.instance.malloc"sv, m_ServerOptions.HubInstanceMalloc, "hub-instance-malloc"sv); + Options.AddOption("hub.instance.trace"sv, m_ServerOptions.HubInstanceTrace, "hub-instance-trace"sv); + Options.AddOption("hub.instance.tracehost"sv, m_ServerOptions.HubInstanceTraceHost, "hub-instance-tracehost"sv); + Options.AddOption("hub.instance.tracefile"sv, m_ServerOptions.HubInstanceTraceFile, "hub-instance-tracefile"sv); + Options.AddOption("hub.instance.httpthreads"sv, m_ServerOptions.HubInstanceHttpThreadCount, "hub-instance-http-threads"sv); + Options.AddOption("hub.instance.corelimit"sv, m_ServerOptions.HubInstanceCoreLimit, "hub-instance-corelimit"sv); + Options.AddOption("hub.instance.config"sv, m_ServerOptions.HubInstanceConfigPath, "hub-instance-config"sv); + Options.AddOption("hub.instance.limits.count"sv, m_ServerOptions.HubInstanceLimit, "hub-instance-limit"sv); + Options.AddOption("hub.instance.limits.disklimitbytes"sv, + m_ServerOptions.HubProvisionDiskLimitBytes, + "hub-provision-disk-limit-bytes"sv); + Options.AddOption("hub.instance.limits.disklimitpercent"sv, + m_ServerOptions.HubProvisionDiskLimitPercent, + "hub-provision-disk-limit-percent"sv); + Options.AddOption("hub.instance.limits.memorylimitbytes"sv, + m_ServerOptions.HubProvisionMemoryLimitBytes, + "hub-provision-memory-limit-bytes"sv); + Options.AddOption("hub.instance.limits.memorylimitpercent"sv, + m_ServerOptions.HubProvisionMemoryLimitPercent, + "hub-provision-memory-limit-percent"sv); + Options.AddOption("hub.instance.provisionthreads"sv, + m_ServerOptions.HubInstanceProvisionThreadCount, + "hub-instance-provision-threads"sv); + + Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); + Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); + Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); + + Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); + Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, + m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs, + "hub-watchdog-cycle-processing-budget-ms"sv); + Options.AddOption("hub.watchdog.instancecheckthrottlems"sv, + m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs, + "hub-watchdog-instance-check-throttle-ms"sv); + Options.AddOption("hub.watchdog.provisionedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds, + "hub-watchdog-provisioned-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.hibernatedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds, + "hub-watchdog-hibernated-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.inactivitycheckmarginseconds"sv, + m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds, + "hub-watchdog-inactivity-check-margin-seconds"sv); + Options.AddOption("hub.watchdog.activitycheckconnecttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs, + "hub-watchdog-activity-check-connect-timeout-ms"sv); + Options.AddOption("hub.watchdog.activitycheckrequesttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs, + "hub-watchdog-activity-check-request-timeout-ms"sv); + +#if ZEN_PLATFORM_WINDOWS + Options.AddOption("hub.usejobobject"sv, m_ServerOptions.HubUseJobObject, "hub-use-job-object"sv); +#endif } void @@ -226,6 +411,28 @@ ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) void ZenHubServerConfigurator::ValidateOptions() { + if (m_ServerOptions.HubProvisionDiskLimitPercent > 100) + { + throw OptionParseException( + fmt::format("'--hub-provision-disk-limit-percent' ({}) must be in range 0..100", m_ServerOptions.HubProvisionDiskLimitPercent), + {}); + } + if (m_ServerOptions.HubProvisionMemoryLimitPercent > 100) + { + throw OptionParseException(fmt::format("'--hub-provision-memory-limit-percent' ({}) must be in range 0..100", + m_ServerOptions.HubProvisionMemoryLimitPercent), + {}); + } + if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty()) + { + throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {}); + } + if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath)) + { + throw OptionParseException( + fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()), + {}); + } } /////////////////////////////////////////////////////////////////////////// @@ -247,55 +454,71 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, HubInstanceState NewState) { ZEN_UNUSED(PreviousState); - if (!m_ConsulClient) - { - return; - } - if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned) + if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating) { - consul::ServiceRegistrationInfo ServiceInfo{ - .ServiceId = std::string(ModuleId), - .ServiceName = "zen-storage", - .Port = Info.Port, - .HealthEndpoint = "health", - .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), - std::make_pair("zen-hub", std::string(HubInstanceId)), - std::make_pair("version", std::string(ZEN_CFG_VERSION))}, - .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning - ? 0u - : m_ConsulHealthIntervalSeconds, // Disable health checks while not finished provisioning - .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning - ? 0u - : m_ConsulDeregisterAfterSeconds}; // Disable health checks while not finished provisioning - - if (!m_ConsulClient->RegisterService(ServiceInfo)) + if (Info.Port != 0) { - ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); - } - else - { - ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", - ModuleId, - Info.Port, - ServiceInfo.ServiceName); + m_Proxy->PrunePort(Info.Port); } } - else if (NewState == HubInstanceState::Unprovisioned) + + if (!m_ConsulClient) { - if (!m_ConsulClient->DeregisterService(ModuleId)) - { - ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", - ModuleId, - Info.Port); - } - else - { - ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); - } + return; + } + + switch (NewState) + { + case HubInstanceState::Provisioning: + case HubInstanceState::Waking: + case HubInstanceState::Recovering: + case HubInstanceState::Provisioned: + { + const bool IsProvisioned = NewState == HubInstanceState::Provisioned; + + consul::ServiceRegistrationInfo ServiceInfo{ + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", + .Port = Info.Port, + .HealthEndpoint = "health", + .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), + std::make_pair("zen-hub", std::string(HubInstanceId)), + std::make_pair("version", std::string(ZEN_CFG_VERSION))}, + .HealthIntervalSeconds = IsProvisioned ? m_ConsulHealthIntervalSeconds : 0u, + .DeregisterAfterSeconds = IsProvisioned ? m_ConsulDeregisterAfterSeconds : 0u, + .InitialStatus = IsProvisioned ? "passing" : ""}; + + m_ConsulClient->RegisterService(ServiceInfo); + ZEN_INFO("Submitted Consul registration for storage server instance for module '{}' at port {} as '{}'", + ModuleId, + Info.Port, + ServiceInfo.ServiceName); + break; + } + case HubInstanceState::Deprovisioning: + case HubInstanceState::Hibernating: + case HubInstanceState::Obliterating: + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Unprovisioned: + { + // A Consul registration is "live" while the module is in a register-state + // (Provisioning / Waking / Recovering / Provisioned). Deregister once when + // we leave a register-state into any non-register-state + const bool WasRegisteredState = + PreviousState == HubInstanceState::Provisioning || PreviousState == HubInstanceState::Waking || + PreviousState == HubInstanceState::Recovering || PreviousState == HubInstanceState::Provisioned; + if (WasRegisteredState) + { + m_ConsulClient->DeregisterService(ModuleId); + ZEN_INFO("Submitted Consul deregistration for storage server instance for module '{}' at port {}", ModuleId, Info.Port); + } + } + break; + default: + break; } - // Transitional states (Deprovisioning, Hibernating, Waking, Recovering, Crashed) - // and Hibernated are intentionally ignored. } int @@ -317,6 +540,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: // the main test range. ZenServerEnvironment::SetBaseChildId(1000); + m_ProvisionWorkerPool = + std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision"); + m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration"); + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); @@ -342,12 +569,18 @@ ZenHubServer::Cleanup() m_IoRunner.join(); } - ShutdownServices(); if (m_Http) { m_Http->Close(); } + ShutdownServices(); + + if (m_Proxy) + { + m_Proxy->Shutdown(); + } + if (m_Hub) { m_Hub->Shutdown(); @@ -357,6 +590,7 @@ ZenHubServer::Cleanup() m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); + m_Proxy.reset(); m_ConsulRegistration.reset(); m_ConsulClient.reset(); @@ -373,49 +607,121 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) ZEN_UNUSED(ServerConfig); } +ResourceMetrics +ZenHubServer::ResolveLimits(const ZenHubServerConfig& ServerConfig) +{ + uint64_t DiskTotal = 0; + uint64_t MemoryTotal = 0; + + if (ServerConfig.HubProvisionDiskLimitPercent > 0) + { + DiskSpace Disk; + if (DiskSpaceInfo(ServerConfig.DataDir, Disk)) + { + DiskTotal = Disk.Total; + } + else + { + ZEN_WARN("Failed to query disk space for '{}'; disk percent limit will not be applied", ServerConfig.DataDir); + } + } + if (ServerConfig.HubProvisionMemoryLimitPercent > 0) + { + MemoryTotal = GetSystemMetrics().SystemMemoryMiB * 1024 * 1024; + } + + auto Resolve = [](uint64_t Bytes, uint32_t Pct, uint64_t Total) -> uint64_t { + const uint64_t PctBytes = Pct > 0 ? (Total * Pct) / 100 : 0; + if (Bytes > 0 && PctBytes > 0) + { + return Min(Bytes, PctBytes); + } + return Bytes > 0 ? Bytes : PctBytes; + }; + + return { + .DiskUsageBytes = Resolve(ServerConfig.HubProvisionDiskLimitBytes, ServerConfig.HubProvisionDiskLimitPercent, DiskTotal), + .MemoryUsageBytes = Resolve(ServerConfig.HubProvisionMemoryLimitBytes, ServerConfig.HubProvisionMemoryLimitPercent, MemoryTotal), + }; +} + void ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) { ZEN_INFO("instantiating Hub"); + Hub::Configuration HubConfig{ + .UseJobObject = ServerConfig.HubUseJobObject, + .BasePortNumber = ServerConfig.HubBasePortNumber, + .InstanceLimit = ServerConfig.HubInstanceLimit, + .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, + .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, + .InstanceMalloc = ServerConfig.HubInstanceMalloc, + .InstanceTrace = ServerConfig.HubInstanceTrace, + .InstanceTraceHost = ServerConfig.HubInstanceTraceHost, + .InstanceTraceFile = ServerConfig.HubInstanceTraceFile, + .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, + .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, + .WatchDog = + { + .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), + .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs), + .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs), + .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds), + .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds), + .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds), + .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), + .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), + }, + .ResourceLimits = ResolveLimits(ServerConfig), + .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(), + .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()}; + + if (!ServerConfig.HydrationTargetConfigPath.empty()) + { + FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath); + if (!Contents) + { + throw zen::runtime_error("Failed to read hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + Contents.ErrorCode.message()); + } + IoBuffer Buffer(Contents.Flatten()); + std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize()); + + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError); + if (!ParseError.empty() || !Root.IsObject()) + { + throw zen::runtime_error("Failed to parse hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + ParseError.empty() ? "root must be a JSON object" : ParseError); + } + HubConfig.HydrationOptions = std::move(Root).AsObject(); + } + + m_Proxy = std::make_unique<HttpProxyHandler>(); + m_Hub = std::make_unique<Hub>( - Hub::Configuration{ - .UseJobObject = ServerConfig.HubUseJobObject, - .BasePortNumber = ServerConfig.HubBasePortNumber, - .InstanceLimit = ServerConfig.HubInstanceLimit, - .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, - .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, - .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, - .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, - .WatchDog = - { - .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), - .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs), - .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs), - .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds), - .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds), - .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds), - .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), - .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), - }}, + std::move(HubConfig), ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), - &GetMediumWorkerPool(EWorkloadType::Background), - m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info, - HubInstanceState PreviousState, - HubInstanceState NewState) { - OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); - }} - : Hub::AsyncModuleStateChangeCallbackFunc{}); + Hub::AsyncModuleStateChangeCallbackFunc{ + [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); + }}); + + m_Proxy->SetPortValidator([Hub = m_Hub.get()](uint16_t Port) { return Hub->IsInstancePort(Port); }); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); ZEN_INFO("instantiating hub service"); - m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService); + m_HubService = std::make_unique<HttpHubService>(*m_Hub, *m_Proxy, m_StatsService, m_StatusService); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); @@ -465,21 +771,32 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi } else { - ZEN_INFO("Consul token read from environment variable '{}'", ConsulAccessTokenEnvName); + ZEN_INFO("Consul token will be read from environment variable '{}'", ConsulAccessTokenEnvName); } try { - m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint, ConsulAccessToken); + m_ConsulClient = std::make_unique<consul::ConsulClient>(consul::ConsulClient::Configuration{ + .BaseUri = ServerConfig.ConsulEndpoint, + .TokenEnvName = ConsulAccessTokenEnvName, + }); m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; + if (!ServerConfig.ConsulRegisterHub) + { + ZEN_INFO( + "Hub parent Consul registration skipped (consul-register-hub is false); " + "instance registration remains enabled"); + return; + } + consul::ServiceRegistrationInfo Info; Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); Info.ServiceName = "zen-hub"; // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? Info.Port = static_cast<uint16_t>(EffectivePort); - Info.HealthEndpoint = "hub/health"; + Info.HealthEndpoint = "health"; Info.Tags = std::vector<std::pair<std::string, std::string>>{ std::make_pair("zen-hub", Info.ServiceId), std::make_pair("version", std::string(ZEN_CFG_VERSION)), @@ -569,6 +886,8 @@ ZenHubServer::Run() OnReady(); + StartSelfSession("zenhub"); + m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 77df3eaa3..5e465bb14 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -3,8 +3,10 @@ #pragma once #include "hubinstancestate.h" +#include "resourcemetrics.h" #include "zenserver.h" +#include <zencore/workthreadpool.h> #include <zenutil/consul.h> namespace cxxopts { @@ -19,6 +21,7 @@ namespace zen { class HttpApiService; class HttpFrontendService; class HttpHubService; +class HttpProxyHandler; struct ZenHubWatchdogConfig { @@ -34,21 +37,33 @@ struct ZenHubWatchdogConfig struct ZenHubServerConfig : public ZenServerConfig { - std::string UpstreamNotificationEndpoint; - std::string InstanceId; // For use in notifications - std::string ConsulEndpoint; // If set, enables Consul service registration - std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty - uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks - uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service - uint16_t HubBasePortNumber = 21000; - int HubInstanceLimit = 1000; - bool HubUseJobObject = true; - std::string HubInstanceHttpClass = "asio"; - uint32_t HubInstanceHttpThreadCount = 0; // Automatic - int HubInstanceCoreLimit = 0; // Automatic - std::filesystem::path HubInstanceConfigPath; // Path to Lua config file - std::string HydrationTargetSpecification; // hydration/dehydration target specification + std::string UpstreamNotificationEndpoint; + std::string InstanceId; // For use in notifications + std::string ConsulEndpoint; // If set, enables Consul service registration + std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty + uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks + uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service + bool ConsulRegisterHub = true; // Whether to register the hub parent service with Consul (instance registration unaffected) + uint16_t HubBasePortNumber = 21000; + int HubInstanceLimit = 1000; + bool HubUseJobObject = true; + std::string HubInstanceHttpClass = "asio"; + std::string HubInstanceMalloc; + std::string HubInstanceTrace; + std::string HubInstanceTraceHost; + std::string HubInstanceTraceFile; + uint32_t HubInstanceHttpThreadCount = 0; // Automatic + uint32_t HubInstanceProvisionThreadCount = 0; // Synchronous provisioning + uint32_t HubHydrationThreadCount = 0; // Synchronous hydration/dehydration + int HubInstanceCoreLimit = 0; // Automatic + std::filesystem::path HubInstanceConfigPath; // Path to Lua config file + std::string HydrationTargetSpecification; // hydration/dehydration target specification + std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification) ZenHubWatchdogConfig WatchdogConfig; + uint64_t HubProvisionDiskLimitBytes = 0; + uint32_t HubProvisionDiskLimitPercent = 0; + uint64_t HubProvisionMemoryLimitBytes = 0; + uint32_t HubProvisionMemoryLimitPercent = 0; }; class Hub; @@ -115,7 +130,10 @@ private: std::filesystem::path m_ContentRoot; bool m_DebugOptionForcedCrash = false; - std::unique_ptr<Hub> m_Hub; + std::unique_ptr<HttpProxyHandler> m_Proxy; + std::unique_ptr<WorkerThreadPool> m_ProvisionWorkerPool; + std::unique_ptr<WorkerThreadPool> m_HydrationWorkerPool; + std::unique_ptr<Hub> m_Hub; std::unique_ptr<HttpHubService> m_HubService; std::unique_ptr<HttpApiService> m_ApiService; @@ -126,6 +144,8 @@ private: uint32_t m_ConsulHealthIntervalSeconds = 10; uint32_t m_ConsulDeregisterAfterSeconds = 30; + static ResourceMetrics ResolveLimits(const ZenHubServerConfig& ServerConfig); + void InitializeState(const ZenHubServerConfig& ServerConfig); void InitializeServices(const ZenHubServerConfig& ServerConfig); void RegisterServices(const ZenHubServerConfig& ServerConfig); |