aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub')
-rw-r--r--src/zenserver/hub/README.md17
-rw-r--r--src/zenserver/hub/httphubservice.cpp214
-rw-r--r--src/zenserver/hub/httphubservice.h17
-rw-r--r--src/zenserver/hub/httpproxyhandler.cpp528
-rw-r--r--src/zenserver/hub/httpproxyhandler.h52
-rw-r--r--src/zenserver/hub/hub.cpp1019
-rw-r--r--src/zenserver/hub/hub.h86
-rw-r--r--src/zenserver/hub/hubinstancestate.cpp2
-rw-r--r--src/zenserver/hub/hubinstancestate.h3
-rw-r--r--src/zenserver/hub/hydration.cpp2201
-rw-r--r--src/zenserver/hub/hydration.h71
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp145
-rw-r--r--src/zenserver/hub/storageserverinstance.h62
-rw-r--r--src/zenserver/hub/zenhubserver.cpp477
-rw-r--r--src/zenserver/hub/zenhubserver.h50
15 files changed, 3689 insertions, 1255 deletions
diff --git a/src/zenserver/hub/README.md b/src/zenserver/hub/README.md
index 322be3649..c75349fa5 100644
--- a/src/zenserver/hub/README.md
+++ b/src/zenserver/hub/README.md
@@ -3,23 +3,32 @@
The Zen Server can act in a "hub" mode. In this mode, the only services offered are the basic health
and diagnostic services alongside an API to provision and deprovision Storage server instances.
+A module ID is an alphanumeric identifier (hyphens allowed) that identifies a dataset, typically
+associated with a content plug-in module.
+
## Generic Server API
GET `/health` - returns an `OK!` payload when all enabled services are up and responding
## Hub API
-GET `{moduleid}` - alphanumeric identifier to identify a dataset (typically associated with a content plug-in module)
-
-GET `/hub/status` - obtain a summary of the currently live instances
+GET `/hub/status` - obtain a summary of all currently live instances
GET `/hub/modules/{moduleid}` - retrieve information about a module
+DELETE `/hub/modules/{moduleid}` - obliterate a module (permanently destroys all data)
+
POST `/hub/modules/{moduleid}/provision` - provision service for module
POST `/hub/modules/{moduleid}/deprovision` - deprovision service for module
-GET `/hub/stats` - retrieve stats for service
+POST `/hub/modules/{moduleid}/hibernate` - hibernate a provisioned module
+
+POST `/hub/modules/{moduleid}/wake` - wake a hibernated module
+
+GET `/stats/hub` - retrieve stats for the hub service
+
+`/hub/proxy/{port}/{path}` - reverse proxy to a child instance dashboard (all HTTP verbs)
## Hub Configuration
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index ebefcf2e3..e4b0c28d0 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -2,6 +2,7 @@
#include "httphubservice.h"
+#include "httpproxyhandler.h"
#include "hub.h"
#include "storageserverinstance.h"
@@ -43,10 +44,11 @@ namespace {
}
} // namespace
-HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService)
+HttpHubService::HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsService& StatsService, HttpStatusService& StatusService)
: m_Hub(Hub)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
+, m_Proxy(Proxy)
{
using namespace std::literals;
@@ -67,6 +69,23 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta
return true;
});
+ m_Router.AddMatcher("port", [](std::string_view Str) -> bool {
+ if (Str.empty())
+ {
+ return false;
+ }
+ for (const auto C : Str)
+ {
+ if (!std::isdigit(C))
+ {
+ return false;
+ }
+ }
+ return true;
+ });
+
+ m_Router.AddMatcher("proxypath", [](std::string_view Str) -> bool { return !Str.empty(); });
+
m_Router.RegisterRoute(
"status",
[this](HttpRouterRequest& Req) {
@@ -78,6 +97,10 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta
Obj << "moduleId" << ModuleId;
Obj << "state" << ToString(Info.State);
Obj << "port" << Info.Port;
+ if (Info.StateChangeTime != std::chrono::system_clock::time_point::min())
+ {
+ Obj << "state_change_time" << ToDateTime(Info.StateChangeTime);
+ }
Obj.BeginObject("process_metrics");
{
Obj << "MemoryBytes" << Info.Metrics.MemoryBytes;
@@ -98,6 +121,11 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "deprovision",
+ [this](HttpRouterRequest& Req) { HandleDeprovisionAll(Req.ServerRequest()); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"modules/{moduleid}",
[this](HttpRouterRequest& Req) {
std::string_view ModuleId = Req.GetCapture(1);
@@ -229,15 +257,23 @@ HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpSta
HttpVerb::kPost);
m_Router.RegisterRoute(
- "stats",
+ "proxy/{port}/{proxypath}",
[this](HttpRouterRequest& Req) {
- CbObjectWriter Obj;
- Obj << "currentInstanceCount" << m_Hub.GetInstanceCount();
- Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount();
- Obj << "instanceLimit" << m_Hub.GetConfig().InstanceLimit;
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ std::string_view PortStr = Req.GetCapture(1);
+
+ // Use RelativeUriWithExtension to preserve the file extension that the
+ // router's URI parser strips (e.g. ".css", ".js") - the upstream server
+ // needs the full path including the extension.
+ std::string_view FullUri = Req.ServerRequest().RelativeUriWithExtension();
+ std::string_view Prefix = "proxy/";
+
+ // FullUri is "proxy/{port}/{path...}" - skip past "proxy/{port}/"
+ size_t PathStart = Prefix.size() + PortStr.size() + 1;
+ std::string_view PathTail = (PathStart < FullUri.size()) ? FullUri.substr(PathStart) : std::string_view{};
+
+ m_Proxy.HandleProxyRequest(Req.ServerRequest(), PortStr, PathTail);
},
- HttpVerb::kGet);
+ HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete | HttpVerb::kHead);
m_StatsService.RegisterHandler("hub", *this);
m_StatusService.RegisterHandler("hub", *this);
@@ -286,7 +322,37 @@ HttpHubService::HandleStatusRequest(HttpServerRequest& Request)
void
HttpHubService::HandleStatsRequest(HttpServerRequest& Request)
{
- Request.WriteResponse(HttpResponseCode::OK, CollectStats());
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Cbo << "currentInstanceCount" << m_Hub.GetInstanceCount();
+ Cbo << "maxInstanceCount" << m_Hub.GetMaxInstanceCount();
+ Cbo << "instanceLimit" << m_Hub.GetConfig().InstanceLimit;
+
+ SystemMetrics SysMetrics;
+ DiskSpace Disk;
+ m_Hub.GetMachineMetrics(SysMetrics, Disk);
+ Cbo.BeginObject("machine");
+ {
+ Cbo << "disk_free_bytes" << Disk.Free;
+ Cbo << "disk_total_bytes" << Disk.Total;
+ Cbo << "memory_avail_mib" << SysMetrics.AvailSystemMemoryMiB;
+ Cbo << "memory_total_mib" << SysMetrics.SystemMemoryMiB;
+ Cbo << "virtual_memory_avail_mib" << SysMetrics.AvailVirtualMemoryMiB;
+ Cbo << "virtual_memory_total_mib" << SysMetrics.VirtualMemoryMiB;
+ }
+ Cbo.EndObject();
+
+ const ResourceMetrics& Limits = m_Hub.GetConfig().ResourceLimits;
+ Cbo.BeginObject("resource_limits");
+ {
+ Cbo << "disk_bytes" << Limits.DiskUsageBytes;
+ Cbo << "memory_bytes" << Limits.MemoryUsageBytes;
+ }
+ Cbo.EndObject();
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
CbObject
@@ -310,6 +376,81 @@ HttpHubService::GetActivityCounter()
}
void
+HttpHubService::HandleDeprovisionAll(HttpServerRequest& Request)
+{
+ std::vector<std::string> ModulesToDeprovision;
+ m_Hub.EnumerateModules([&ModulesToDeprovision](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated)
+ {
+ ModulesToDeprovision.push_back(std::string(ModuleId));
+ }
+ });
+
+ if (ModulesToDeprovision.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ std::vector<std::string> Rejected;
+ std::vector<std::string> Accepted;
+ std::vector<std::string> Completed;
+ for (const std::string& ModuleId : ModulesToDeprovision)
+ {
+ Hub::Response Response = m_Hub.Deprovision(ModuleId);
+ switch (Response.ResponseCode)
+ {
+ case Hub::EResponseCode::NotFound:
+ // Ignore
+ break;
+ case Hub::EResponseCode::Rejected:
+ Rejected.push_back(ModuleId);
+ break;
+ case Hub::EResponseCode::Accepted:
+ Accepted.push_back(ModuleId);
+ break;
+ case Hub::EResponseCode::Completed:
+ Completed.push_back(ModuleId);
+ break;
+ }
+ }
+ if (Rejected.empty() && Accepted.empty() && Completed.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ HttpResponseCode Response = HttpResponseCode::OK;
+ CbObjectWriter Writer;
+ if (!Completed.empty())
+ {
+ Writer.BeginArray("Completed");
+ for (const std::string& ModuleId : Completed)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Completed
+ }
+ if (!Accepted.empty())
+ {
+ Writer.BeginArray("Accepted");
+ for (const std::string& ModuleId : Accepted)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Accepted
+ Response = HttpResponseCode::Accepted;
+ }
+ if (!Rejected.empty())
+ {
+ Writer.BeginArray("Rejected");
+ for (const std::string& ModuleId : Rejected)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Rejected
+ Response = HttpResponseCode::Conflict;
+ }
+ Request.WriteResponse(Response, Writer.Save());
+}
+
+void
HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId)
{
Hub::InstanceInfo InstanceInfo;
@@ -328,45 +469,36 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod
void
HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId)
{
- Hub::InstanceInfo InstanceInfo;
- if (!m_Hub.Find(ModuleId, &InstanceInfo))
+ Hub::Response Resp = m_Hub.Obliterate(std::string(ModuleId));
+
+ if (HandleFailureResults(Request, Resp))
{
- Request.WriteResponse(HttpResponseCode::NotFound);
return;
}
- if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated ||
- InstanceInfo.State == HubInstanceState::Crashed)
- {
- try
- {
- Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId));
-
- if (HandleFailureResults(Request, Resp))
- {
- return;
- }
-
- // TODO: nuke all related storage
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
+ CbObjectWriter Obj;
+ Obj << "moduleId" << ModuleId;
+ Request.WriteResponse(HttpCode, Obj.Save());
+}
- const HttpResponseCode HttpCode =
- (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
- CbObjectWriter Obj;
- Obj << "moduleId" << ModuleId;
- return Request.WriteResponse(HttpCode, Obj.Save());
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what());
- throw;
- }
- }
+void
+HttpHubService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri)
+{
+ m_Proxy.OnWebSocketOpen(std::move(Connection), RelativeUri);
+}
- // TODO: nuke all related storage
+void
+HttpHubService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg)
+{
+ m_Proxy.OnWebSocketMessage(Conn, Msg);
+}
- CbObjectWriter Obj;
- Obj << "moduleId" << ModuleId;
- Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
+void
+HttpHubService::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason)
+{
+ m_Proxy.OnWebSocketClose(Conn, Code, Reason);
}
} // namespace zen
diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h
index 1bb1c303e..f4d1b0b89 100644
--- a/src/zenserver/hub/httphubservice.h
+++ b/src/zenserver/hub/httphubservice.h
@@ -2,11 +2,16 @@
#pragma once
+#include <zencore/thread.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstatus.h>
+#include <zenhttp/websocket.h>
+
+#include <memory>
namespace zen {
+class HttpProxyHandler;
class HttpStatsService;
class Hub;
@@ -16,10 +21,10 @@ class Hub;
* use in UEFN content worker style scenarios.
*
*/
-class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
+class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler
{
public:
- HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService);
+ HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsService& StatsService, HttpStatusService& StatusService);
~HttpHubService();
HttpHubService(const HttpHubService&) = delete;
@@ -32,6 +37,11 @@ public:
virtual CbObject CollectStats() override;
virtual uint64_t GetActivityCounter() override;
+ // IWebSocketHandler
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) override;
+ void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override;
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override;
+
void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId);
private:
@@ -43,8 +53,11 @@ private:
HttpStatsService& m_StatsService;
HttpStatusService& m_StatusService;
+ void HandleDeprovisionAll(HttpServerRequest& Request);
void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
+
+ HttpProxyHandler& m_Proxy;
};
} // namespace zen
diff --git a/src/zenserver/hub/httpproxyhandler.cpp b/src/zenserver/hub/httpproxyhandler.cpp
new file mode 100644
index 000000000..235d7388f
--- /dev/null
+++ b/src/zenserver/hub/httpproxyhandler.cpp
@@ -0,0 +1,528 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpproxyhandler.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/httpwsclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <charconv>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+namespace {
+
+ std::string InjectProxyScript(std::string_view Html, uint16_t Port)
+ {
+ ExtendableStringBuilder<2048> Script;
+ Script.Append("<script>\n(function(){\n var P = \"/hub/proxy/");
+ Script.Append(fmt::format("{}", Port));
+ Script.Append(
+ "\";\n"
+ " var OF = window.fetch;\n"
+ " window.fetch = function(u, o) {\n"
+ " if (typeof u === \"string\") {\n"
+ " try {\n"
+ " var p = new URL(u, location.origin);\n"
+ " if (p.origin === location.origin && !p.pathname.startsWith(P))\n"
+ " { p.pathname = P + p.pathname; u = p.toString(); }\n"
+ " } catch(e) {\n"
+ " if (u.startsWith(\"/\") && !u.startsWith(P)) u = P + u;\n"
+ " }\n"
+ " }\n"
+ " return OF.call(this, u, o);\n"
+ " };\n"
+ " var OW = window.WebSocket;\n"
+ " window.WebSocket = function(u, pr) {\n"
+ " try {\n"
+ " var p = new URL(u);\n"
+ " if (p.hostname === location.hostname\n"
+ " && String(p.port || (p.protocol === \"wss:\" ? \"443\" : \"80\"))\n"
+ " === String(location.port || (location.protocol === \"https:\" ? \"443\" : \"80\"))\n"
+ " && !p.pathname.startsWith(P))\n"
+ " { p.pathname = P + p.pathname; u = p.toString(); }\n"
+ " } catch(e) {}\n"
+ " return pr !== undefined ? new OW(u, pr) : new OW(u);\n"
+ " };\n"
+ " window.WebSocket.prototype = OW.prototype;\n"
+ " window.WebSocket.CONNECTING = OW.CONNECTING;\n"
+ " window.WebSocket.OPEN = OW.OPEN;\n"
+ " window.WebSocket.CLOSING = OW.CLOSING;\n"
+ " window.WebSocket.CLOSED = OW.CLOSED;\n"
+ " var OO = window.open;\n"
+ " window.open = function(u, t, f) {\n"
+ " if (typeof u === \"string\") {\n"
+ " try {\n"
+ " var p = new URL(u, location.origin);\n"
+ " if (p.origin === location.origin && !p.pathname.startsWith(P))\n"
+ " { p.pathname = P + p.pathname; u = p.toString(); }\n"
+ " } catch(e) {}\n"
+ " }\n"
+ " return OO.call(this, u, t, f);\n"
+ " };\n"
+ " document.addEventListener(\"click\", function(e) {\n"
+ " var t = e.composedPath ? e.composedPath()[0] : e.target;\n"
+ " while (t && t.tagName !== \"A\") t = t.parentNode || t.host;\n"
+ " if (!t || !t.href) return;\n"
+ " try {\n"
+ " var h = new URL(t.href);\n"
+ " if (h.origin === location.origin && !h.pathname.startsWith(P))\n"
+ " { h.pathname = P + h.pathname; e.preventDefault(); window.location.href = h.toString(); }\n"
+ " } catch(x) {}\n"
+ " }, true);\n"
+ "})();\n</script>");
+
+ std::string ScriptStr = Script.ToString();
+
+ size_t HeadClose = Html.find("</head>");
+ if (HeadClose != std::string_view::npos)
+ {
+ std::string Result;
+ Result.reserve(Html.size() + ScriptStr.size());
+ Result.append(Html.substr(0, HeadClose));
+ Result.append(ScriptStr);
+ Result.append(Html.substr(HeadClose));
+ return Result;
+ }
+
+ std::string Result;
+ Result.reserve(Html.size() + ScriptStr.size());
+ Result.append(ScriptStr);
+ Result.append(Html);
+ return Result;
+ }
+
+} // namespace
+
+struct HttpProxyHandler::WsBridge : public RefCounted, public IWsClientHandler
+{
+ Ref<WebSocketConnection> ClientConn;
+ std::unique_ptr<HttpWsClient> UpstreamClient;
+ uint16_t Port = 0;
+
+ void OnWsOpen() override {}
+
+ void OnWsMessage(const WebSocketMessage& Msg) override
+ {
+ if (!ClientConn->IsOpen())
+ {
+ return;
+ }
+ switch (Msg.Opcode)
+ {
+ case WebSocketOpcode::kText:
+ ClientConn->SendText(std::string_view(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize()));
+ break;
+ case WebSocketOpcode::kBinary:
+ ClientConn->SendBinary(std::span<const uint8_t>(static_cast<const uint8_t*>(Msg.Payload.GetData()), Msg.Payload.GetSize()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ void OnWsClose(uint16_t Code, std::string_view Reason) override
+ {
+ if (ClientConn->IsOpen())
+ {
+ ClientConn->Close(Code, Reason);
+ }
+ }
+};
+
+HttpProxyHandler::HttpProxyHandler()
+{
+}
+
+HttpProxyHandler::HttpProxyHandler(PortValidator ValidatePort) : m_ValidatePort(std::move(ValidatePort))
+{
+}
+
+void
+HttpProxyHandler::SetPortValidator(PortValidator ValidatePort)
+{
+ m_ValidatePort = std::move(ValidatePort);
+}
+
+HttpProxyHandler::~HttpProxyHandler()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (...)
+ {
+ }
+}
+
+HttpClient&
+HttpProxyHandler::GetOrCreateProxyClient(uint16_t Port)
+{
+ HttpClient* Result = nullptr;
+ m_ProxyClientsLock.WithExclusiveLock([&] {
+ auto It = m_ProxyClients.find(Port);
+ if (It == m_ProxyClients.end())
+ {
+ HttpClientSettings Settings;
+ Settings.LogCategory = "hub-proxy";
+ Settings.ConnectTimeout = std::chrono::milliseconds(5000);
+ Settings.Timeout = std::chrono::milliseconds(30000);
+ auto Client = std::make_unique<HttpClient>(fmt::format("http://127.0.0.1:{}", Port), Settings);
+ Result = Client.get();
+ m_ProxyClients.emplace(Port, std::move(Client));
+ }
+ else
+ {
+ Result = It->second.get();
+ }
+ });
+ return *Result;
+}
+
+void
+HttpProxyHandler::HandleProxyRequest(HttpServerRequest& Request, std::string_view PortStr, std::string_view PathTail)
+{
+ uint16_t Port = 0;
+ auto [Ptr, Ec] = std::from_chars(PortStr.data(), PortStr.data() + PortStr.size(), Port);
+ if (Ec != std::errc{} || Ptr != PortStr.data() + PortStr.size())
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "invalid proxy URL");
+ return;
+ }
+
+ if (!m_ValidatePort(Port))
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "target instance not available");
+ return;
+ }
+
+ HttpClient& Client = GetOrCreateProxyClient(Port);
+
+ std::string RequestPath;
+ RequestPath.reserve(1 + PathTail.size());
+ RequestPath.push_back('/');
+ RequestPath.append(PathTail);
+
+ std::string_view QueryString = Request.QueryString();
+ if (!QueryString.empty())
+ {
+ RequestPath.push_back('?');
+ RequestPath.append(QueryString);
+ }
+
+ HttpClient::KeyValueMap ForwardHeaders;
+ HttpContentType AcceptType = Request.AcceptContentType();
+ if (AcceptType != HttpContentType::kUnknownContentType)
+ {
+ ForwardHeaders->emplace("Accept", std::string(MapContentTypeToString(AcceptType)));
+ }
+
+ std::string_view Auth = Request.GetAuthorizationHeader();
+ if (!Auth.empty())
+ {
+ ForwardHeaders->emplace("Authorization", std::string(Auth));
+ }
+
+ HttpContentType ReqContentType = Request.RequestContentType();
+ if (ReqContentType != HttpContentType::kUnknownContentType)
+ {
+ ForwardHeaders->emplace("Content-Type", std::string(MapContentTypeToString(ReqContentType)));
+ }
+
+ HttpClient::Response Response;
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ Response = Client.Get(RequestPath, ForwardHeaders);
+ break;
+ case HttpVerb::kPost:
+ {
+ IoBuffer Payload = Request.ReadPayload();
+ Response = Client.Post(RequestPath, Payload, ForwardHeaders);
+ break;
+ }
+ case HttpVerb::kPut:
+ {
+ IoBuffer Payload = Request.ReadPayload();
+ Response = Client.Put(RequestPath, Payload, ForwardHeaders);
+ break;
+ }
+ case HttpVerb::kDelete:
+ Response = Client.Delete(RequestPath, ForwardHeaders);
+ break;
+ case HttpVerb::kHead:
+ Response = Client.Head(RequestPath, ForwardHeaders);
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::MethodNotAllowed, HttpContentType::kText, "method not supported");
+ return;
+ }
+
+ if (Response.Error)
+ {
+ if (!m_ValidatePort(Port))
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "target instance not available");
+ return;
+ }
+
+ ZEN_WARN("proxy request to port {} failed: {}", Port, Response.Error->ErrorMessage);
+ switch (Response.Error->ErrorCode)
+ {
+ case HttpClientErrorCode::kConnectionFailure:
+ case HttpClientErrorCode::kHostResolutionFailure:
+ return Request.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("instance not reachable: {}", Response.Error->ErrorMessage));
+ case HttpClientErrorCode::kOperationTimedOut:
+ return Request.WriteResponse(HttpResponseCode::GatewayTimeout,
+ HttpContentType::kText,
+ fmt::format("upstream request timed out: {}", Response.Error->ErrorMessage));
+ case HttpClientErrorCode::kRequestCancelled:
+ return Request.WriteResponse(HttpResponseCode::ServiceUnavailable,
+ HttpContentType::kText,
+ fmt::format("upstream request cancelled: {}", Response.Error->ErrorMessage));
+ default:
+ return Request.WriteResponse(HttpResponseCode::BadGateway,
+ HttpContentType::kText,
+ fmt::format("upstream request failed: {}", Response.Error->ErrorMessage));
+ }
+ }
+
+ HttpContentType ContentType = Response.ResponsePayload.GetContentType();
+
+ if (ContentType == HttpContentType::kHTML)
+ {
+ std::string_view Html(static_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize());
+ std::string Injected = InjectProxyScript(Html, Port);
+ Request.WriteResponse(Response.StatusCode, HttpContentType::kHTML, std::string_view(Injected));
+ }
+ else
+ {
+ Request.WriteResponse(Response.StatusCode, ContentType, std::move(Response.ResponsePayload));
+ }
+}
+
+void
+HttpProxyHandler::PrunePort(uint16_t Port)
+{
+ m_ProxyClientsLock.WithExclusiveLock([&] { m_ProxyClients.erase(Port); });
+
+ std::vector<Ref<WsBridge>> Stale;
+ m_WsBridgesLock.WithExclusiveLock([&] {
+ for (auto It = m_WsBridges.begin(); It != m_WsBridges.end();)
+ {
+ if (It->second->Port == Port)
+ {
+ Stale.push_back(std::move(It->second));
+ It = m_WsBridges.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ for (auto& Bridge : Stale)
+ {
+ if (Bridge->UpstreamClient)
+ {
+ Bridge->UpstreamClient->Close(1001, "instance shutting down");
+ }
+ if (Bridge->ClientConn->IsOpen())
+ {
+ Bridge->ClientConn->Close(1001, "instance shutting down");
+ }
+ }
+}
+
+void
+HttpProxyHandler::Shutdown()
+{
+ m_WsBridgesLock.WithExclusiveLock([&] { m_WsBridges.clear(); });
+ m_ProxyClientsLock.WithExclusiveLock([&] { m_ProxyClients.clear(); });
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// WebSocket proxy
+//
+
+void
+HttpProxyHandler::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri)
+{
+ const std::string_view ProxyPrefix = "proxy/";
+ if (!RelativeUri.starts_with(ProxyPrefix))
+ {
+ Connection->Close(1008, "unsupported WebSocket endpoint");
+ return;
+ }
+
+ std::string_view ProxyTail = RelativeUri.substr(ProxyPrefix.size());
+
+ size_t SlashPos = ProxyTail.find('/');
+ std::string_view PortStr = (SlashPos != std::string_view::npos) ? ProxyTail.substr(0, SlashPos) : ProxyTail;
+ std::string_view Path = (SlashPos != std::string_view::npos) ? ProxyTail.substr(SlashPos) : "/";
+
+ uint16_t Port = 0;
+ auto [Ptr, Ec] = std::from_chars(PortStr.data(), PortStr.data() + PortStr.size(), Port);
+ if (Ec != std::errc{} || Ptr != PortStr.data() + PortStr.size())
+ {
+ Connection->Close(1008, "invalid proxy URL");
+ return;
+ }
+
+ if (!m_ValidatePort(Port))
+ {
+ Connection->Close(1008, "target instance not available");
+ return;
+ }
+
+ std::string WsUrl = HttpToWsUrl(fmt::format("http://127.0.0.1:{}", Port), Path);
+
+ Ref<WsBridge> Bridge(new WsBridge());
+ Bridge->ClientConn = Connection;
+ Bridge->Port = Port;
+
+ Bridge->UpstreamClient = std::make_unique<HttpWsClient>(WsUrl, *Bridge);
+
+ try
+ {
+ Bridge->UpstreamClient->Connect();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("proxy WebSocket connect to {} failed: {}", WsUrl, Ex.what());
+ Connection->Close(1011, "upstream connect failed");
+ return;
+ }
+
+ WebSocketConnection* Key = Connection.Get();
+ m_WsBridgesLock.WithExclusiveLock([&] { m_WsBridges.emplace(Key, std::move(Bridge)); });
+}
+
+void
+HttpProxyHandler::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg)
+{
+ Ref<WsBridge> Bridge;
+ m_WsBridgesLock.WithSharedLock([&] {
+ auto It = m_WsBridges.find(&Conn);
+ if (It != m_WsBridges.end())
+ {
+ Bridge = It->second;
+ }
+ });
+
+ if (!Bridge || !Bridge->UpstreamClient)
+ {
+ return;
+ }
+
+ switch (Msg.Opcode)
+ {
+ case WebSocketOpcode::kText:
+ Bridge->UpstreamClient->SendText(std::string_view(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize()));
+ break;
+ case WebSocketOpcode::kBinary:
+ Bridge->UpstreamClient->SendBinary(
+ std::span<const uint8_t>(static_cast<const uint8_t*>(Msg.Payload.GetData()), Msg.Payload.GetSize()));
+ break;
+ case WebSocketOpcode::kClose:
+ Bridge->UpstreamClient->Close(Msg.CloseCode, {});
+ break;
+ default:
+ break;
+ }
+}
+
+void
+HttpProxyHandler::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason)
+{
+ Ref<WsBridge> Bridge = m_WsBridgesLock.WithExclusiveLock([this, &Conn]() -> Ref<WsBridge> {
+ auto It = m_WsBridges.find(&Conn);
+ if (It != m_WsBridges.end())
+ {
+ Ref<WsBridge> Bridge = std::move(It->second);
+ m_WsBridges.erase(It);
+ return Bridge;
+ }
+ return {};
+ });
+
+ if (Bridge && Bridge->UpstreamClient)
+ {
+ Bridge->UpstreamClient->Close(Code, Reason);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_SUITE_BEGIN("server.httpproxyhandler");
+
+TEST_CASE("server.httpproxyhandler.html_injection")
+{
+ SUBCASE("injects before </head>")
+ {
+ std::string Result = InjectProxyScript("<html><head></head><body></body></html>", 21005);
+ CHECK(Result.find("<script>") != std::string::npos);
+ CHECK(Result.find("/hub/proxy/21005") != std::string::npos);
+ size_t ScriptEnd = Result.find("</script>");
+ size_t HeadClose = Result.find("</head>");
+ REQUIRE(ScriptEnd != std::string::npos);
+ REQUIRE(HeadClose != std::string::npos);
+ CHECK(ScriptEnd < HeadClose);
+ }
+
+ SUBCASE("prepends when no </head>")
+ {
+ std::string Result = InjectProxyScript("<body>content</body>", 21005);
+ CHECK(Result.find("<script>") == 0);
+ CHECK(Result.find("<body>content</body>") != std::string::npos);
+ }
+
+ SUBCASE("empty html")
+ {
+ std::string Result = InjectProxyScript("", 21005);
+ CHECK(Result.find("<script>") != std::string::npos);
+ CHECK(Result.find("/hub/proxy/21005") != std::string::npos);
+ }
+
+ SUBCASE("preserves original content")
+ {
+ std::string_view Html = "<html><head><title>Test</title></head><body><h1>Dashboard</h1></body></html>";
+ std::string Result = InjectProxyScript(Html, 21005);
+ CHECK(Result.find("<title>Test</title>") != std::string::npos);
+ CHECK(Result.find("<h1>Dashboard</h1>") != std::string::npos);
+ }
+}
+
+TEST_CASE("server.httpproxyhandler.port_embedding")
+{
+ std::string Result = InjectProxyScript("<head></head>", 80);
+ CHECK(Result.find("/hub/proxy/80") != std::string::npos);
+
+ Result = InjectProxyScript("<head></head>", 65535);
+ CHECK(Result.find("/hub/proxy/65535") != std::string::npos);
+}
+
+TEST_SUITE_END();
+
+void
+httpproxyhandler_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen
diff --git a/src/zenserver/hub/httpproxyhandler.h b/src/zenserver/hub/httpproxyhandler.h
new file mode 100644
index 000000000..8667c0ca1
--- /dev/null
+++ b/src/zenserver/hub/httpproxyhandler.h
@@ -0,0 +1,52 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/thread.h>
+#include <zenhttp/httpserver.h>
+#include <zenhttp/websocket.h>
+
+#include <functional>
+#include <memory>
+#include <unordered_map>
+
+namespace zen {
+
+class HttpClient;
+
+class HttpProxyHandler
+{
+public:
+ using PortValidator = std::function<bool(uint16_t)>;
+
+ HttpProxyHandler();
+ explicit HttpProxyHandler(PortValidator ValidatePort);
+ ~HttpProxyHandler();
+
+ void SetPortValidator(PortValidator ValidatePort);
+
+ HttpProxyHandler(const HttpProxyHandler&) = delete;
+ HttpProxyHandler& operator=(const HttpProxyHandler&) = delete;
+
+ void HandleProxyRequest(HttpServerRequest& Request, std::string_view PortStr, std::string_view PathTail);
+ void PrunePort(uint16_t Port);
+ void Shutdown();
+
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri);
+ void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg);
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason);
+
+private:
+ PortValidator m_ValidatePort;
+
+ HttpClient& GetOrCreateProxyClient(uint16_t Port);
+
+ RwLock m_ProxyClientsLock;
+ std::unordered_map<uint16_t, std::unique_ptr<HttpClient>> m_ProxyClients;
+
+ struct WsBridge;
+ RwLock m_WsBridgesLock;
+ std::unordered_map<WebSocketConnection*, Ref<WsBridge>> m_WsBridges;
+};
+
+} // namespace zen
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 6c44e2333..978814b46 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -19,7 +19,6 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
-# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
#endif
@@ -122,32 +121,84 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- WorkerThreadPool* OptionalWorkerPool,
- AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
+ProcessMetrics
+Hub::AtomicProcessMetrics::Load() const
+{
+ return {
+ .MemoryBytes = MemoryBytes.load(),
+ .KernelTimeMs = KernelTimeMs.load(),
+ .UserTimeMs = UserTimeMs.load(),
+ .WorkingSetSize = WorkingSetSize.load(),
+ .PeakWorkingSetSize = PeakWorkingSetSize.load(),
+ .PagefileUsage = PagefileUsage.load(),
+ .PeakPagefileUsage = PeakPagefileUsage.load(),
+ };
+}
+
+void
+Hub::AtomicProcessMetrics::Store(const ProcessMetrics& Metrics)
+{
+ MemoryBytes.store(Metrics.MemoryBytes);
+ KernelTimeMs.store(Metrics.KernelTimeMs);
+ UserTimeMs.store(Metrics.UserTimeMs);
+ WorkingSetSize.store(Metrics.WorkingSetSize);
+ PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize);
+ PagefileUsage.store(Metrics.PagefileUsage);
+ PeakPagefileUsage.store(Metrics.PeakPagefileUsage);
+}
+
+void
+Hub::AtomicProcessMetrics::Reset()
+{
+ MemoryBytes.store(0);
+ KernelTimeMs.store(0);
+ UserTimeMs.store(0);
+ WorkingSetSize.store(0);
+ PeakWorkingSetSize.store(0);
+ PagefileUsage.store(0);
+ PeakPagefileUsage.store(0);
+}
+
+void
+Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const
+{
+ m_Lock.WithSharedLock([&]() {
+ OutSystemMetrict = m_SystemMetrics;
+ OutDiskSpace = m_DiskSpace;
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
-, m_WorkerPool(OptionalWorkerPool)
+, m_WorkerPool(Config.OptionalProvisionWorkerPool)
, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
, m_ActiveInstances(Config.InstanceLimit)
, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
- m_HostMetrics = GetSystemMetrics();
- m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
- m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
+ ZEN_ASSERT_FORMAT(
+ Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
+ "Provision and hydration worker pools must be distinct to avoid deadlocks");
- if (m_Config.HydrationTargetSpecification.empty())
+ HydrationBase::Configuration HydrationConfig;
+ if (!m_Config.HydrationTargetSpecification.empty())
+ {
+ HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification;
+ }
+ else if (!m_Config.HydrationOptions)
{
std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
ZEN_INFO("using file hydration path: '{}'", FileHydrationPath);
- m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
+ HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
}
else
{
- m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ HydrationConfig.Options = m_Config.HydrationOptions;
}
+ m_Hydration = InitHydration(HydrationConfig);
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
@@ -171,6 +222,9 @@ Hub::Hub(const Configuration& Config,
}
}
#endif
+
+ UpdateMachineMetrics();
+
m_WatchDog = std::thread([this]() { WatchDog(); });
}
@@ -195,6 +249,9 @@ Hub::Shutdown()
{
ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+ bool Expected = false;
+ bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true);
+
m_WatchDogEvent.Set();
if (m_WatchDog.joinable())
{
@@ -203,8 +260,6 @@ Hub::Shutdown()
m_WatchDog = {};
- bool Expected = false;
- bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true);
if (WaitForBackgroundWork && m_WorkerPool)
{
m_BackgroundWorkLatch.CountDown();
@@ -254,7 +309,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
std::string Reason;
- if (!CanProvisionInstance(ModuleId, /* out */ Reason))
+ if (!CanProvisionInstanceLocked(ModuleId, /* out */ Reason))
{
ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
@@ -271,12 +326,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .HydrationTempPath = m_HydrationTempPath,
- .HydrationTargetSpecification = m_HydrationTargetSpecification,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath},
+ *m_Hydration,
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -289,6 +350,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
Instance = NewInstance->LockExclusive(/*Wait*/ true);
m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
+ m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset();
m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
// Set Provisioning while both hub lock and instance lock are held so that any
// concurrent Deprovision sees the in-flight state, not Unprovisioned.
@@ -329,11 +391,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
switch (CurrentState)
{
case HubInstanceState::Provisioning:
+ case HubInstanceState::Recovering:
+ case HubInstanceState::Waking:
return Response{EResponseCode::Accepted};
case HubInstanceState::Crashed:
case HubInstanceState::Unprovisioned:
break;
case HubInstanceState::Provisioned:
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
return Response{EResponseCode::Completed};
case HubInstanceState::Hibernated:
_.ReleaseNow();
@@ -354,6 +419,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
Instance = {};
if (ActualState == HubInstanceState::Provisioned)
{
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
return Response{EResponseCode::Completed};
}
if (ActualState == HubInstanceState::Provisioning)
@@ -540,6 +606,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
switch (CurrentState)
{
case HubInstanceState::Deprovisioning:
+ case HubInstanceState::Obliterating:
return Response{EResponseCode::Accepted};
case HubInstanceState::Crashed:
case HubInstanceState::Hibernated:
@@ -585,11 +652,11 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
try
{
m_WorkerPool->ScheduleWork(
- [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteDeprovision(*Instance, ActiveInstanceIndex);
+ CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState);
}
catch (const std::exception& Ex)
{
@@ -617,20 +684,222 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
}
else
{
- CompleteDeprovision(Instance, ActiveInstanceIndex);
+ CompleteDeprovision(Instance, ActiveInstanceIndex, OldState);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+Hub::Response
+Hub::Obliterate(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
+ {
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
+ {
+ case HubInstanceState::Obliterating:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Provisioned:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Crashed:
+ break;
+ case HubInstanceState::Deprovisioning:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is being deprovisioned, retry after completion", ModuleId)};
+ case HubInstanceState::Recovering:
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ case HubInstanceState::Unprovisioned:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
+ }
+
+ std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(RawInstance != nullptr);
+
+ Instance = RawInstance->LockExclusive(/*Wait*/ true);
+ }
+ else
+ {
+ // Module not tracked by hub - obliterate backend data directly.
+ // Covers the deprovisioned case where data was preserved via dehydration.
+ if (m_ObliteratingInstances.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ m_ObliteratingInstances.insert(ModuleId);
+ Lock.ReleaseNow();
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId)]() {
+ auto Guard = MakeGuard([this, ModuleId]() {
+ m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); });
+ m_BackgroundWorkLatch.CountDown();
+ });
+ try
+ {
+ ObliterateBackendData(ModuleId);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async obliterate of untracked module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed to dispatch async obliterate of untracked module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ObliteratingInstances.erase(ModuleId);
+ }
+ throw;
+ }
+
+ return Response{EResponseCode::Accepted};
+ }
+
+ auto _ = MakeGuard([this, &ModuleId]() {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ObliteratingInstances.erase(ModuleId);
+ });
+
+ ObliterateBackendData(ModuleId);
+
+ return Response{EResponseCode::Completed};
+ }
+ }
+
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Obliterating);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {});
+
+ if (m_WorkerPool)
+ {
+ std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
+ std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
+
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteObliterate(*Instance, ActiveInstanceIndex);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {});
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
+ }
+
+ throw;
+ }
+ }
+ else
+ {
+ CompleteObliterate(Instance, ActiveInstanceIndex);
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
+Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
{
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
try
{
+ Instance.Obliterate();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what());
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed);
+ }
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {});
+ throw;
+ }
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {});
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
+}
+
+void
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
+{
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
+
+ try
+ {
+ if (OldState == HubInstanceState::Provisioned)
+ {
+ ZEN_INFO("Triggering GC for module {}", ModuleId);
+
+ HttpClient GcClient(fmt::format("http://localhost:{}", Port));
+
+ HttpClient::KeyValueMap Params;
+ Params.Entries.insert({"smallobjects", "true"});
+ Params.Entries.insert({"skipcid", "false"});
+ HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params);
+ Stopwatch Timer;
+ while (Response && Timer.GetElapsedTimeMs() < 5000)
+ {
+ Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Response)
+ {
+ bool Complete = Response.AsObject()["Status"].AsString() != "Running";
+ if (Complete)
+ {
+ break;
+ }
+ Sleep(50);
+ }
+ }
+ }
Instance.Deprovision();
}
catch (const std::exception& Ex)
@@ -649,20 +918,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
}
NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {});
- Instance = {};
-
- std::unique_ptr<StorageServerInstance> DeleteInstance;
- {
- RwLock::ExclusiveLockScope HubLock(m_Lock);
- auto It = m_InstanceLookup.find(std::string(ModuleId));
- ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
- ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex);
- DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
- }
- DeleteInstance.reset();
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
}
Hub::Response
@@ -935,6 +1191,46 @@ Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t Ac
}
}
+void
+Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId)
+{
+ Instance = {};
+
+ std::unique_ptr<StorageServerInstance> DeleteInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex);
+ DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DeleteInstance.reset();
+}
+
+void
+Hub::ObliterateBackendData(std::string_view ModuleId)
+{
+ std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId;
+ std::filesystem::path TempDir = m_HydrationTempPath / ModuleId;
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)};
+ if (m_Config.OptionalHydrationWorkerPool)
+ {
+ Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool,
+ .AbortFlag = &AbortFlag,
+ .PauseFlag = &PauseFlag});
+ }
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config);
+ Hydrator->Obliterate();
+}
+
bool
Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo)
{
@@ -947,12 +1243,10 @@ Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo)
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(Instance);
- InstanceInfo Info{
- m_ActiveInstances[ActiveInstanceIndex].State.load(),
- std::chrono::system_clock::now() // TODO
- };
- Instance->GetProcessMetrics(Info.Metrics);
- Info.Port = Instance->GetBasePort();
+ InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(),
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()};
+ Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load();
+ Info.Port = Instance->GetBasePort();
*OutInstanceInfo = Info;
}
@@ -971,12 +1265,10 @@ Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const Instan
{
const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(Instance);
- InstanceInfo Info{
- m_ActiveInstances[ActiveInstanceIndex].State.load(),
- std::chrono::system_clock::now() // TODO
- };
- Instance->GetProcessMetrics(Info.Metrics);
- Info.Port = Instance->GetBasePort();
+ InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(),
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()};
+ Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load();
+ Info.Port = Instance->GetBasePort();
Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info));
}
@@ -994,30 +1286,15 @@ Hub::GetInstanceCount()
return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); });
}
-void
-Hub::UpdateCapacityMetrics()
-{
- m_HostMetrics = GetSystemMetrics();
-
- // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks...
- // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules
-}
-
-void
-Hub::UpdateStats()
-{
- int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast<int>(m_InstanceLookup.size()); });
- int CurrentMaxCount = m_MaxInstanceCount.load();
-
- int NewMax = Max(CurrentMaxCount, CurrentInstanceCount);
-
- m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax);
-}
-
bool
-Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
+Hub::CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason)
{
- ZEN_UNUSED(ModuleId);
+ if (m_ObliteratingInstances.contains(std::string(ModuleId)))
+ {
+ OutReason = fmt::format("module '{}' is being obliterated", ModuleId);
+ return false;
+ }
+
if (m_FreeActiveInstanceIndexes.empty())
{
OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit);
@@ -1025,7 +1302,24 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return false;
}
- // TODO: handle additional resource metrics
+ const uint64_t DiskUsedBytes = m_DiskSpace.Free <= m_DiskSpace.Total ? m_DiskSpace.Total - m_DiskSpace.Free : 0;
+ if (m_Config.ResourceLimits.DiskUsageBytes > 0 && DiskUsedBytes > m_Config.ResourceLimits.DiskUsageBytes)
+ {
+ OutReason =
+ fmt::format("disk usage ({}) exceeds ({})", NiceBytes(DiskUsedBytes), NiceBytes(m_Config.ResourceLimits.DiskUsageBytes));
+ return false;
+ }
+
+ const uint64_t RamUsedMiB = m_SystemMetrics.AvailSystemMemoryMiB <= m_SystemMetrics.SystemMemoryMiB
+ ? m_SystemMetrics.SystemMemoryMiB - m_SystemMetrics.AvailSystemMemoryMiB
+ : 0;
+ const uint64_t RamUsedBytes = RamUsedMiB * 1024 * 1024;
+ if (m_Config.ResourceLimits.MemoryUsageBytes > 0 && RamUsedBytes > m_Config.ResourceLimits.MemoryUsageBytes)
+ {
+ OutReason =
+ fmt::format("ram usage ({}) exceeds ({})", NiceBytes(RamUsedBytes), NiceBytes(m_Config.ResourceLimits.MemoryUsageBytes));
+ return false;
+ }
return true;
}
@@ -1036,6 +1330,21 @@ Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const
return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex);
}
+bool
+Hub::IsInstancePort(uint16_t Port) const
+{
+ if (Port < m_Config.BasePortNumber)
+ {
+ return false;
+ }
+ size_t Index = Port - m_Config.BasePortNumber;
+ if (Index >= m_ActiveInstances.size())
+ {
+ return false;
+ }
+ return m_ActiveInstances[Index].State.load(std::memory_order_relaxed) != HubInstanceState::Unprovisioned;
+}
+
HubInstanceState
Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState)
{
@@ -1046,11 +1355,13 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
case HubInstanceState::Unprovisioned:
return To == HubInstanceState::Provisioning;
case HubInstanceState::Provisioned:
- return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed;
+ return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed ||
+ To == HubInstanceState::Obliterating;
case HubInstanceState::Hibernated:
- return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning;
+ return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Obliterating;
case HubInstanceState::Crashed:
- return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering;
+ return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning ||
+ To == HubInstanceState::Recovering || To == HubInstanceState::Obliterating;
case HubInstanceState::Provisioning:
return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed;
case HubInstanceState::Hibernating:
@@ -1062,11 +1373,15 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
To == HubInstanceState::Crashed;
case HubInstanceState::Recovering:
return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned;
+ case HubInstanceState::Obliterating:
+ return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed;
}
return false;
}(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState));
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0);
- m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(Now);
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.store(Now);
return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState);
}
@@ -1075,10 +1390,14 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
-
{
RwLock::ExclusiveLockScope _(m_Lock);
+ if (m_ShutdownFlag.load())
+ {
+ return;
+ }
+
auto It = m_InstanceLookup.find(std::string(ModuleId));
if (It == m_InstanceLookup.end())
{
@@ -1173,14 +1492,14 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
StorageServerInstance::SharedLockedPtr&& LockedInstance,
size_t ActiveInstanceIndex)
{
+ const std::string ModuleId(LockedInstance.GetModuleId());
+
HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
if (LockedInstance.IsRunning())
{
- LockedInstance.UpdateMetrics();
+ m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics());
if (InstanceState == HubInstanceState::Provisioned)
{
- const std::string ModuleId(LockedInstance.GetModuleId());
-
const uint16_t Port = LockedInstance.GetBasePort();
const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
@@ -1260,8 +1579,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
else if (InstanceState == HubInstanceState::Provisioned)
{
// Process is not running but state says it should be - instance died unexpectedly.
- const std::string ModuleId(LockedInstance.GetModuleId());
- const uint16_t Port = LockedInstance.GetBasePort();
+ const uint16_t Port = LockedInstance.GetBasePort();
UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
LockedInstance = {};
@@ -1272,7 +1590,6 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
{
// Process is not running - no HTTP activity check is possible.
// Use a pure time-based check; the margin window does not apply here.
- const std::string ModuleId = std::string(LockedInstance.GetModuleId());
const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
@@ -1304,7 +1621,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
}
else
{
- // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
+ // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Obliterating) - expected, skip.
// Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
// lock was busy on a previous cycle and recovery is already pending.
return true;
@@ -1312,6 +1629,43 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
}
void
+Hub::UpdateMachineMetrics()
+{
+ try
+ {
+ bool DiskSpaceOk = false;
+ DiskSpace Disk;
+
+ std::filesystem::path ChildDir = m_RunEnvironment.GetChildBaseDir();
+ if (!ChildDir.empty())
+ {
+ if (DiskSpaceInfo(ChildDir, Disk))
+ {
+ DiskSpaceOk = true;
+ }
+ else
+ {
+ ZEN_WARN("Failed to query disk space for '{}'; disk-based provisioning limits will not be enforced", ChildDir);
+ }
+ }
+
+ SystemMetrics Metrics = GetSystemMetrics();
+
+ m_Lock.WithExclusiveLock([&]() {
+ if (DiskSpaceOk)
+ {
+ m_DiskSpace = Disk;
+ }
+ m_SystemMetrics = Metrics;
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to update machine metrics. Reason: {}", Ex.what());
+ }
+}
+
+void
Hub::WatchDog()
{
const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count();
@@ -1326,16 +1680,18 @@ Hub::WatchDog()
[&]() -> bool { return m_WatchDogEvent.Wait(0); });
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
- while (!m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
+ while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
{
try
{
+ UpdateMachineMetrics();
+
// Snapshot slot count. We iterate all slots (including freed nulls) so
// round-robin coverage is not skewed by deprovisioned entries.
size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); });
Stopwatch Timer;
- bool ShuttingDown = false;
+ bool ShuttingDown = m_ShutdownFlag.load();
while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown)
{
StorageServerInstance::SharedLockedPtr LockedInstance;
@@ -1366,16 +1722,24 @@ Hub::WatchDog()
std::string ModuleId(LockedInstance.GetModuleId());
- bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
- if (InstanceIsOk)
+ try
{
- ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
+ bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
+ if (InstanceIsOk)
+ {
+ ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
+ }
+ else
+ {
+ ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
+ AttemptRecoverInstance(ModuleId);
+ }
}
- else
+ catch (const std::exception& Ex)
{
- ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
- AttemptRecoverInstance(ModuleId);
+ ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what());
}
+ ShuttingDown |= m_ShutdownFlag.load();
}
}
catch (const std::exception& Ex)
@@ -1417,6 +1781,14 @@ static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::mill
namespace hub_testutils {
+ struct TestHubPools
+ {
+ WorkerThreadPool ProvisionPool;
+ WorkerThreadPool HydrationPool;
+
+ explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {}
+ };
+
ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
{
return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir);
@@ -1425,9 +1797,14 @@ namespace hub_testutils {
std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
Hub::Configuration Config = {},
Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {},
- WorkerThreadPool* WorkerPool = nullptr)
+ TestHubPools* Pools = nullptr)
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback));
+ if (Pools)
+ {
+ Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool;
+ Config.OptionalHydrationWorkerPool = &Pools->HydrationPool;
+ }
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
}
struct CallbackRecord
@@ -1499,14 +1876,32 @@ namespace hub_testutils {
} // namespace hub_testutils
-TEST_CASE("hub.provision_basic")
+TEST_CASE("hub.provision")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+
+ hub_testutils::StateChangeCapture CaptureInstance;
+
+ auto CaptureFunc =
+ [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState);
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+ // Provision
HubProvisionedInstanceInfo Info;
const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
@@ -1515,12 +1910,23 @@ TEST_CASE("hub.provision_basic")
Hub::InstanceInfo InstanceInfo;
REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
+ CHECK_NE(InstanceInfo.StateChangeTime, std::chrono::system_clock::time_point::min());
+ CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now());
{
HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
+ // Verify provision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Deprovision
const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
@@ -1530,6 +1936,28 @@ TEST_CASE("hub.provision_basic")
HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
+
+ // Verify deprovision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Verify full transition sequence
+ {
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_EQ(Transitions.size(), 4u);
+ CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
+ CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
+ }
}
TEST_CASE("hub.provision_config")
@@ -1582,92 +2010,6 @@ TEST_CASE("hub.provision_config")
}
}
-TEST_CASE("hub.provision_callbacks")
-{
- ScopedTemporaryDirectory TempDir;
-
- hub_testutils::StateChangeCapture CaptureInstance;
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc());
-
- HubProvisionedInstanceInfo Info;
-
- const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info);
- REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
-
- {
- RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
- REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
- CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module");
- CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
- CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0);
- }
-
- {
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
- CHECK(ModClient.Get("/health/"));
- }
-
- const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module");
- CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
-
- {
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
- }
-
- {
- RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
- REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module");
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
- CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
- }
-}
-
-TEST_CASE("hub.provision_callback_sequence")
-{
- ScopedTemporaryDirectory TempDir;
-
- struct TransitionRecord
- {
- HubInstanceState OldState;
- HubInstanceState NewState;
- };
- RwLock CaptureMutex;
- std::vector<TransitionRecord> Transitions;
-
- auto CaptureFunc =
- [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
- ZEN_UNUSED(ModuleId);
- ZEN_UNUSED(Info);
- CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
- };
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
-
- HubProvisionedInstanceInfo Info;
- {
- const Hub::Response R = HubInstance->Provision("seq_module", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
- {
- const Hub::Response R = HubInstance->Deprovision("seq_module");
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- RwLock::SharedLockScope _(CaptureMutex);
- REQUIRE_EQ(Transitions.size(), 4u);
- CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
- CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
- CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
- CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
- CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
- CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
- CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
- CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
-}
-
TEST_CASE("hub.instance_limit")
{
ScopedTemporaryDirectory TempDir;
@@ -1699,54 +2041,7 @@ TEST_CASE("hub.instance_limit")
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
-TEST_CASE("hub.enumerate_modules")
-{
- ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
-
- HubProvisionedInstanceInfo Info;
-
- {
- const Hub::Response R = HubInstance->Provision("enum_a", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
- {
- const Hub::Response R = HubInstance->Provision("enum_b", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- std::vector<std::string> Ids;
- int ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
- Ids.push_back(std::string(ModuleId));
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- CHECK_EQ(Ids.size(), 2u);
- CHECK_EQ(ProvisionedCount, 2);
- const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
- const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
- CHECK(FoundA);
- CHECK(FoundB);
-
- HubInstance->Deprovision("enum_a");
- Ids.clear();
- ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
- Ids.push_back(std::string(ModuleId));
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- REQUIRE_EQ(Ids.size(), 1u);
- CHECK_EQ(Ids[0], "enum_b");
- CHECK_EQ(ProvisionedCount, 1);
-}
-
-TEST_CASE("hub.max_instance_count")
+TEST_CASE("hub.enumerate_and_instance_tracking")
{
ScopedTemporaryDirectory TempDir;
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
@@ -1756,22 +2051,56 @@ TEST_CASE("hub.max_instance_count")
HubProvisionedInstanceInfo Info;
{
- const Hub::Response R = HubInstance->Provision("max_a", Info);
+ const Hub::Response R = HubInstance->Provision("track_a", Info);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
{
- const Hub::Response R = HubInstance->Provision("max_b", Info);
+ const Hub::Response R = HubInstance->Provision("track_b", Info);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
+ // Enumerate both modules
+ {
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ CHECK_EQ(Ids.size(), 2u);
+ CHECK_EQ(ProvisionedCount, 2);
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end());
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end());
+ }
+
const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
- HubInstance->Deprovision("max_a");
+ // Deprovision one - max instance count must not decrease
+ HubInstance->Deprovision("track_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
+
+ // Enumerate after deprovision
+ {
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ REQUIRE_EQ(Ids.size(), 1u);
+ CHECK_EQ(Ids[0], "track_b");
+ CHECK_EQ(ProvisionedCount, 1);
+ }
}
TEST_CASE("hub.concurrent_callbacks")
@@ -1917,7 +2246,7 @@ TEST_CASE("hub.job_object")
}
# endif // ZEN_PLATFORM_WINDOWS
-TEST_CASE("hub.hibernate_wake")
+TEST_CASE("hub.hibernate_wake_obliterate")
{
ScopedTemporaryDirectory TempDir;
Hub::Configuration Config;
@@ -1927,6 +2256,11 @@ TEST_CASE("hub.hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
+ // Error cases on non-existent modules (no provision needed)
+ CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+
// Provision
{
const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
@@ -1934,82 +2268,104 @@ TEST_CASE("hub.hibernate_wake")
}
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ const std::chrono::system_clock::time_point ProvisionedTime = Info.StateChangeTime;
+ CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min());
+ CHECK_LE(ProvisionedTime, std::chrono::system_clock::now());
{
HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
+ // Double-wake on provisioned module is idempotent
+ CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
// Hibernate
- const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a");
- REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message);
+ {
+ const Hub::Response R = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Hibernated);
+ const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime;
+ CHECK_GE(HibernatedTime, ProvisionedTime);
{
HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
+ // Double-hibernate on already-hibernated module is idempotent
+ CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
// Wake
- const Hub::Response WakeResult = HubInstance->Wake("hib_a");
- REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message);
+ {
+ const Hub::Response R = HubInstance->Wake("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ CHECK_GE(Info.StateChangeTime, HibernatedTime);
{
HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
- // Deprovision
- const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a");
- CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
- CHECK_FALSE(HubInstance->Find("hib_a"));
+ // Hibernate again for obliterate-from-hibernated test
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ const Hub::Response R = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
-}
-
-TEST_CASE("hub.hibernate_wake_errors")
-{
- ScopedTemporaryDirectory TempDir;
- Hub::Configuration Config;
- Config.BasePortNumber = 22700;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- HubProvisionedInstanceInfo ProvInfo;
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Hibernated);
- // Hibernate/wake on a non-existent module - returns NotFound (-> 404)
- CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
- CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ // Obliterate from hibernated
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("hib_a"));
- // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent)
+ // Re-provision for obliterate-from-provisioned test
{
- const Hub::Response R = HubInstance->Provision("err_b", ProvInfo);
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
- const Hub::Response R = HubInstance->Hibernate("err_b");
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
}
+ // Obliterate from provisioned
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("hib_a"));
{
- const Hub::Response HibResp = HubInstance->Hibernate("err_b");
- CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed);
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
}
- // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent)
+ // Obliterate deprovisioned module (not tracked by hub, backend data may exist)
{
- const Hub::Response R = HubInstance->Wake("err_b");
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
-
{
- const Hub::Response WakeResp = HubInstance->Wake("err_b");
- CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed);
+ const Hub::Response R = HubInstance->Deprovision("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ CHECK_FALSE(HubInstance->Find("hib_a"));
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
}
- // Deprovision not-found - returns NotFound (-> 404)
- CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ // Obliterate of a never-provisioned module also succeeds (no-op backend cleanup)
+ CHECK(HubInstance->Obliterate("never_existed").ResponseCode == Hub::EResponseCode::Completed);
}
TEST_CASE("hub.async_hibernate_wake")
@@ -2019,8 +2375,8 @@ TEST_CASE("hub.async_hibernate_wake")
Hub::Configuration Config;
Config.BasePortNumber = 23000;
- WorkerThreadPool WorkerPool(2, "hub_async_hib_wake");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
@@ -2150,25 +2506,21 @@ TEST_CASE("hub.recover_process_crash")
if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned &&
ModClient.Get("/health/"))
{
- // Recovery must reuse the same port - the instance was never removed from the hub's
- // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort.
CHECK_EQ(InstanceInfo.Port, Info.Port);
Recovered = true;
break;
}
}
- CHECK_MESSAGE(Recovered, "Instance did not recover within timeout");
+ REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// Verify the full crash/recovery callback sequence
{
RwLock::SharedLockScope _(CaptureMutex);
REQUIRE_GE(Transitions.size(), 3u);
- // Find the Provisioned->Crashed transition
const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) {
return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed;
});
REQUIRE_NE(CrashedIt, Transitions.end());
- // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned
const auto RecoveringIt = CrashedIt + 1;
REQUIRE_NE(RecoveringIt, Transitions.end());
CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed);
@@ -2178,44 +2530,6 @@ TEST_CASE("hub.recover_process_crash")
CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering);
CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned);
}
-}
-
-TEST_CASE("hub.recover_process_crash_then_deprovision")
-{
- ScopedTemporaryDirectory TempDir;
-
- // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
- Hub::Configuration Config;
- Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
- Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- HubProvisionedInstanceInfo Info;
- {
- const Hub::Response R = HubInstance->Provision("module_a", Info);
- REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
- }
-
- // Kill the child process, wait for the watchdog to detect and recover the instance.
- HubInstance->TerminateModuleForTesting("module_a");
-
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
- constexpr auto kTimeoutMs = std::chrono::seconds(15);
- const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
-
- bool Recovered = false;
- while (std::chrono::steady_clock::now() < Deadline)
- {
- std::this_thread::sleep_for(kPollIntervalMs);
- Hub::InstanceInfo InstanceInfo;
- if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned)
- {
- Recovered = true;
- break;
- }
- }
- REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// After recovery, deprovision should succeed and a re-provision should work.
{
@@ -2244,8 +2558,8 @@ TEST_CASE("hub.async_provision_concurrent")
Config.BasePortNumber = 22800;
Config.InstanceLimit = kModuleCount;
- WorkerThreadPool WorkerPool(4, "hub_async_concurrent");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(4);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
std::vector<std::string> Reasons(kModuleCount);
@@ -2326,8 +2640,8 @@ TEST_CASE("hub.async_provision_shutdown_waits")
Config.InstanceLimit = kModuleCount;
Config.BasePortNumber = 22900;
- WorkerThreadPool WorkerPool(2, "hub_async_shutdown");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
@@ -2352,15 +2666,15 @@ TEST_CASE("hub.async_provision_shutdown_waits")
TEST_CASE("hub.async_provision_rejected")
{
- // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present.
+ // Rejection from CanProvisionInstanceLocked fires synchronously even when a WorkerPool is present.
ScopedTemporaryDirectory TempDir;
Hub::Configuration Config;
Config.InstanceLimit = 1;
Config.BasePortNumber = 23100;
- WorkerThreadPool WorkerPool(2, "hub_async_rejected");
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
HubProvisionedInstanceInfo Info;
@@ -2369,7 +2683,7 @@ TEST_CASE("hub.async_provision_rejected")
REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message);
REQUIRE_NE(Info.Port, 0);
- // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected
+ // Second provision: CanProvisionInstanceLocked rejects synchronously (limit reached), returns Rejected
HubProvisionedInstanceInfo Info2;
const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2);
CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected);
@@ -2448,12 +2762,12 @@ TEST_CASE("hub.instance.inactivity.deprovision")
// Phase 1: immediately after setup all three instances must still be alive.
// No timeout has elapsed yet (only 100ms have passed).
- CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+ CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed");
CHECK_MESSAGE(HubInstance->Find("persistent"),
- "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+ "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
// Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout.
// idle must remain alive - its 2s provisioned timeout has not elapsed yet.
@@ -2477,7 +2791,7 @@ TEST_CASE("hub.instance.inactivity.deprovision")
CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2");
- CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed");
+ CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed");
CHECK_MESSAGE(HubInstance->Find("persistent"),
"persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
@@ -2485,6 +2799,55 @@ TEST_CASE("hub.instance.inactivity.deprovision")
HubInstance->Shutdown();
}
+TEST_CASE("hub.machine_metrics")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {});
+
+ // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics
+ // are available immediately without waiting for a watchdog cycle.
+ SystemMetrics SysMetrics;
+ DiskSpace Disk;
+ HubInstance->GetMachineMetrics(SysMetrics, Disk);
+
+ CHECK_GT(Disk.Total, 0u);
+ CHECK_LE(Disk.Free, Disk.Total);
+
+ CHECK_GT(SysMetrics.SystemMemoryMiB, 0u);
+ CHECK_LE(SysMetrics.AvailSystemMemoryMiB, SysMetrics.SystemMemoryMiB);
+
+ CHECK_GT(SysMetrics.VirtualMemoryMiB, 0u);
+ CHECK_LE(SysMetrics.AvailVirtualMemoryMiB, SysMetrics.VirtualMemoryMiB);
+}
+
+TEST_CASE("hub.provision_rejected_resource_limits")
+{
+ // The Hub constructor calls UpdateMachineMetrics() synchronously, so CanProvisionInstanceLocked
+ // can enforce limits immediately without waiting for a watchdog cycle.
+ ScopedTemporaryDirectory TempDir;
+
+ {
+ Hub::Configuration Config;
+ Config.ResourceLimits.DiskUsageBytes = 1;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision("disk_limit", Info);
+ CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_NE(Result.Message.find("disk usage"), std::string::npos);
+ }
+
+ {
+ Hub::Configuration Config;
+ Config.ResourceLimits.MemoryUsageBytes = 1;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision("mem_limit", Info);
+ CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_NE(Result.Message.find("ram usage"), std::string::npos);
+ }
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index c343b19e2..40d046ce0 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -6,6 +6,8 @@
#include "resourcemetrics.h"
#include "storageserverinstance.h"
+#include <zencore/compactbinary.h>
+#include <zencore/filesystem.h>
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
@@ -16,6 +18,7 @@
#include <memory>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
namespace zen {
@@ -64,10 +67,20 @@ public:
uint32_t InstanceHttpThreadCount = 0; // Automatic
int InstanceCoreLimit = 0; // Automatic
+ std::string InstanceMalloc;
+ std::string InstanceTrace;
+ std::string InstanceTraceHost;
+ std::string InstanceTraceFile;
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
+ CbObject HydrationOptions;
WatchDogConfiguration WatchDog;
+
+ ResourceMetrics ResourceLimits;
+
+ WorkerThreadPool* OptionalProvisionWorkerPool = nullptr;
+ WorkerThreadPool* OptionalHydrationWorkerPool = nullptr;
};
typedef std::function<
@@ -76,7 +89,6 @@ public:
Hub(const Configuration& Config,
ZenServerEnvironment&& RunEnvironment,
- WorkerThreadPool* OptionalWorkerPool = nullptr,
AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {});
~Hub();
@@ -86,7 +98,7 @@ public:
struct InstanceInfo
{
HubInstanceState State = HubInstanceState::Unprovisioned;
- std::chrono::system_clock::time_point ProvisionTime;
+ std::chrono::system_clock::time_point StateChangeTime;
ProcessMetrics Metrics;
uint16_t Port = 0;
};
@@ -126,6 +138,14 @@ public:
Response Deprovision(const std::string& ModuleId);
/**
+ * Obliterate a storage server instance and all associated data.
+ * Shuts down the process, deletes backend hydration data, and cleans local state.
+ *
+ * @param ModuleId The ID of the module to obliterate.
+ */
+ Response Obliterate(const std::string& ModuleId);
+
+ /**
* Hibernate a storage server instance for the given module ID.
* The instance is shut down but its data is preserved; it can be woken later.
*
@@ -160,6 +180,10 @@ public:
int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); }
+ void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const;
+
+ bool IsInstancePort(uint16_t Port) const;
+
const Configuration& GetConfig() const { return m_Config; }
#if ZEN_WITH_TESTS
@@ -175,15 +199,31 @@ private:
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
- std::string m_HydrationTargetSpecification;
- std::filesystem::path m_HydrationTempPath;
+ std::unique_ptr<HydrationBase> m_Hydration;
+ std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
#endif
- RwLock m_Lock;
+ mutable RwLock m_Lock;
std::unordered_map<std::string, size_t> m_InstanceLookup;
+ // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes.
+ struct AtomicProcessMetrics
+ {
+ std::atomic<uint64_t> MemoryBytes = 0;
+ std::atomic<uint64_t> KernelTimeMs = 0;
+ std::atomic<uint64_t> UserTimeMs = 0;
+ std::atomic<uint64_t> WorkingSetSize = 0;
+ std::atomic<uint64_t> PeakWorkingSetSize = 0;
+ std::atomic<uint64_t> PagefileUsage = 0;
+ std::atomic<uint64_t> PeakPagefileUsage = 0;
+
+ ProcessMetrics Load() const;
+ void Store(const ProcessMetrics& Metrics);
+ void Reset();
+ };
+
struct ActiveInstance
{
// Invariant: Instance == nullptr if and only if State == Unprovisioned.
@@ -192,11 +232,16 @@ private:
// without holding the hub lock.
std::unique_ptr<StorageServerInstance> Instance;
std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
- // TODO: We should move current metrics here (from StorageServerInstance)
- // Read and updated by WatchDog, updates to State triggers a reset of both
+ // Process metrics - written by WatchDog (inside instance shared lock), read lock-free.
+ AtomicProcessMetrics ProcessMetrics;
+
+ // Activity tracking - written by WatchDog, reset on every state transition.
std::atomic<uint64_t> LastKnownActivitySum = 0;
std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min();
+
+ // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules.
+ std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min();
};
// UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive
@@ -224,23 +269,23 @@ private:
}
HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState);
- std::vector<ActiveInstance> m_ActiveInstances;
- std::deque<size_t> m_FreeActiveInstanceIndexes;
- ResourceMetrics m_ResourceLimits;
- SystemMetrics m_HostMetrics;
- std::atomic<int> m_MaxInstanceCount = 0;
- std::thread m_WatchDog;
+ std::vector<ActiveInstance> m_ActiveInstances;
+ std::deque<size_t> m_FreeActiveInstanceIndexes;
+ SystemMetrics m_SystemMetrics;
+ DiskSpace m_DiskSpace;
+ std::atomic<int> m_MaxInstanceCount = 0;
+ std::thread m_WatchDog;
+ std::unordered_set<std::string> m_ObliteratingInstances;
Event m_WatchDogEvent;
void WatchDog();
+ void UpdateMachineMetrics();
bool CheckInstanceStatus(HttpClient& ActivityHttpClient,
StorageServerInstance::SharedLockedPtr&& LockedInstance,
size_t ActiveInstanceIndex);
void AttemptRecoverInstance(std::string_view ModuleId);
- void UpdateStats();
- void UpdateCapacityMetrics();
- bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+ bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason);
uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const;
Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate);
@@ -248,9 +293,12 @@ private:
size_t ActiveInstanceIndex,
HubInstanceState OldState,
bool IsNewInstance);
- void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex);
- void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
- void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex);
+ void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId);
+ void ObliterateBackendData(std::string_view ModuleId);
// Notifications may fire slightly out of sync with the Hub's internal State flag.
// The guarantee is that notifications are sent in the correct order, but the State
diff --git a/src/zenserver/hub/hubinstancestate.cpp b/src/zenserver/hub/hubinstancestate.cpp
index c47fdd294..310305e5d 100644
--- a/src/zenserver/hub/hubinstancestate.cpp
+++ b/src/zenserver/hub/hubinstancestate.cpp
@@ -29,6 +29,8 @@ ToString(HubInstanceState State)
return "crashed";
case HubInstanceState::Recovering:
return "recovering";
+ case HubInstanceState::Obliterating:
+ return "obliterating";
}
ZEN_ASSERT(false);
return "unknown";
diff --git a/src/zenserver/hub/hubinstancestate.h b/src/zenserver/hub/hubinstancestate.h
index c895f75d1..c7188aa5c 100644
--- a/src/zenserver/hub/hubinstancestate.h
+++ b/src/zenserver/hub/hubinstancestate.h
@@ -20,7 +20,8 @@ enum class HubInstanceState : uint32_t
Hibernating, // Provisioned -> Hibernated (Shutting down process, preserving data on disk)
Waking, // Hibernated -> Provisioned (Starting process from preserved data)
Deprovisioning, // Provisioned/Hibernated/Crashed -> Unprovisioned (Shutting down process and cleaning up data)
- Recovering, // Crashed -> Provisioned/Deprovisioned (Attempting in-place restart after a crash)
+ Recovering, // Crashed -> Provisioned/Unprovisioned (Attempting in-place restart after a crash)
+ Obliterating, // Provisioned/Hibernated/Crashed -> Unprovisioned (Destroying all local and backend data)
};
std::string_view ToString(HubInstanceState State);
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 541127590..2f224c357 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -5,20 +5,25 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compress.h>
#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/parallelwork.h>
+#include <zencore/stream.h>
#include <zencore/system.h>
+#include <zencore/timer.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
+#include <zenutil/filesystemutils.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <json11.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
+#include <numeric>
+#include <unordered_map>
+#include <unordered_set>
#if ZEN_WITH_TESTS
-# include <zencore/parallelwork.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zencore/thread.h>
@@ -29,7 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-namespace {
+namespace hydration_impl {
/// UTC time decomposed to calendar fields with sub-second milliseconds.
struct UtcTime
@@ -55,537 +60,1161 @@ namespace {
}
};
-} // namespace
-
-///////////////////////////////////////////////////////////////////////////
-
-constexpr std::string_view FileHydratorPrefix = "file://";
-
-struct FileHydrator : public HydrationStrategyBase
-{
- virtual void Configure(const HydrationConfig& Config) override;
- virtual void Hydrate() override;
- virtual void Dehydrate() override;
-
-private:
- HydrationConfig m_Config;
- std::filesystem::path m_StorageModuleRootDir;
-};
-
-void
-FileHydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
-
- std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length())));
- MakeSafeAbsolutePathInPlace(ConfigPath);
+ std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs)
+ {
+ auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end());
+ std::filesystem::path RelativePath;
+ for (auto I = ItAbs; I != Abs.end(); I++)
+ {
+ RelativePath = RelativePath / *I;
+ }
+ return RelativePath;
+ }
- if (!std::filesystem::exists(ConfigPath))
+ void CleanDirectory(WorkerThreadPool& WorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Path)
{
- throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string()));
+ CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
}
- m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId;
+ ///////////////////////////////////////////////////////////////////////
+ // Per-module storage interface driven by IncrementalHydrator.
- CreateDirectories(m_StorageModuleRootDir);
-}
+ class StorageBase
+ {
+ public:
+ virtual ~StorageBase() = default;
+
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ virtual CbObject GetSettings() = 0;
+ virtual void ParseSettings(const CbObjectView& Settings) = 0;
+ virtual std::vector<IoHash> List() = 0;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) = 0;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) = 0;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) = 0;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
+ };
-void
-FileHydrator::Hydrate()
-{
- ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+ class FileStorage : public StorageBase
+ {
+ public:
+ static constexpr std::string_view Prefix = "file://";
+ static constexpr std::string_view Type = "file";
+
+ explicit FileStorage(std::filesystem::path ModulePath);
+
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override { return {}; }
+ virtual void ParseSettings(const CbObjectView&) override {}
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {}
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
+
+ private:
+ std::filesystem::path m_StoragePath;
+ std::filesystem::path m_StatePathName;
+ std::filesystem::path m_CASPath;
+ };
- // Ensure target is clean
- ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir);
- const bool ForceRemoveReadOnlyFiles = true;
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ class S3Storage : public StorageBase
+ {
+ public:
+ static constexpr std::string_view Prefix = "s3://";
+ static constexpr std::string_view Type = "s3";
+ static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
+
+ S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
+
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override;
+ virtual void ParseSettings(const CbObjectView& Settings) override;
+ virtual std::vector<IoHash> List() override;
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath) override;
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath) override;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
+
+ private:
+ S3Client& m_Client;
+ std::string m_KeyPrefix;
+ std::filesystem::path m_TempDir;
+ uint64_t m_MultipartChunkSize;
+ };
- bool WipeServerState = false;
+ ///////////////////////////////////////////////////////////////////////
+ // FileStorage implementations
- try
+ FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath))
{
- ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
- CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true});
+ MakeSafeAbsolutePathInPlace(m_StoragePath);
+ m_StatePathName = m_StoragePath / "current-state.cbo";
+ m_CASPath = m_StoragePath / "cas";
+ CreateDirectories(m_CASPath);
}
- catch (std::exception& Ex)
+
+ void FileStorage::SaveMetadata(const CbObject& Data)
{
- ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir);
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
+ }
- // We don't do the clean right here to avoid potentially running into double-throws
- WipeServerState = true;
+ CbObject FileStorage::LoadMetadata()
+ {
+ if (!IsFile(m_StatePathName))
+ {
+ return {};
+ }
+ FileContents Content = ReadFile(m_StatePathName);
+ if (Content.ErrorCode)
+ {
+ ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file");
+ }
+ IoBuffer Payload = Content.Flatten();
+ CbValidateError Error;
+ CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
+ if (Error != CbValidateError::None)
+ {
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ }
+ return Result;
}
- if (WipeServerState)
+ std::vector<IoHash> FileStorage::List()
{
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent);
+ std::vector<IoHash> Result;
+ Result.reserve(DirContent.Files.size());
+ for (const std::filesystem::path& Path : DirContent.Files)
+ {
+ IoHash Hash;
+ if (IoHash::TryParse(Path.filename().string(), Hash))
+ {
+ Result.push_back(Hash);
+ }
+ }
+ return Result;
}
-}
-void
-FileHydrator::Dehydrate()
-{
- ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir);
+ void FileStorage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash);
+ if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath));
+ }
+ }
+ });
+ }
- const std::filesystem::path TargetDir = m_StorageModuleRootDir;
+ void FileStorage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ ZEN_UNUSED(Size);
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag.load())
+ {
+ std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash);
+ if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath));
+ }
+ }
+ });
+ }
- // Ensure target is clean. This could be replaced with an atomic copy at a later date
- // (i.e copy into a temporary directory name and rename it once complete)
+ void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
+ {
+ ZEN_UNUSED(Work);
+ ZEN_UNUSED(WorkerPool);
+ DeleteDirectories(m_StoragePath);
+ }
- ZEN_DEBUG("Cleaning storage root '{}'", TargetDir);
- const bool ForceRemoveReadOnlyFiles = true;
- CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
+ ///////////////////////////////////////////////////////////////////////
+ // S3Storage implementations
- bool CopySuccess = true;
+ S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize)
+ : m_Client(Client)
+ , m_KeyPrefix(std::move(KeyPrefix))
+ , m_TempDir(std::move(TempDir))
+ , m_MultipartChunkSize(MultipartChunkSize)
+ {
+ }
- try
+ void S3Storage::SaveMetadata(const CbObject& Data)
{
- ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir);
- CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true});
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+ IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3Result Result = m_Client.PutObject(Key, std::move(Payload));
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ }
}
- catch (std::exception& Ex)
+
+ CbObject S3Storage::LoadMetadata()
{
- ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir);
+ std::string Key = m_KeyPrefix + "/incremental-state.cbo";
+ S3GetObjectResult Result = m_Client.GetObject(Key);
+ if (!Result.IsSuccess())
+ {
+ if (Result.Error == S3GetObjectResult::NotFoundErrorText)
+ {
+ return {};
+ }
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
+ }
- // We don't do the clean right here to avoid potentially running into double-throws
- CopySuccess = false;
+ CbValidateError Error;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
+ if (Error != CbValidateError::None)
+ {
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
+ }
+ return Meta;
}
- if (!CopySuccess)
+ CbObject S3Storage::GetSettings()
{
- ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir);
- CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
+ CbObjectWriter Writer;
+ Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ return Writer.Save();
}
- ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
-}
+ void S3Storage::ParseSettings(const CbObjectView& Settings)
+ {
+ m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ }
-///////////////////////////////////////////////////////////////////////////
+ std::vector<IoHash> S3Storage::List()
+ {
+ std::string CasPrefix = m_KeyPrefix + "/cas/";
+ S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error);
+ }
-constexpr std::string_view S3HydratorPrefix = "s3://";
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Result.Objects.size());
+ for (const S3ObjectInfo& Obj : Result.Objects)
+ {
+ size_t LastSlash = Obj.Key.rfind('/');
+ if (LastSlash == std::string::npos)
+ {
+ continue;
+ }
+ IoHash Hash;
+ if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash))
+ {
+ Hashes.push_back(Hash);
+ }
+ }
+ return Hashes;
+ }
-struct S3Hydrator : public HydrationStrategyBase
-{
- void Configure(const HydrationConfig& Config) override;
- void Dehydrate() override;
- void Hydrate() override;
+ void S3Storage::Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath)
+ {
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObjectMultipart(
+ Key,
+ Size,
+ [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
+ m_MultipartChunkSize);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ else
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObject(Key, File.ReadAll());
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ });
+ }
-private:
- S3Client CreateS3Client() const;
- std::string BuildTimestampFolderName() const;
- std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const;
+ void S3Storage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath)
+ {
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- HydrationConfig m_Config;
- std::string m_Bucket;
- std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash
- std::string m_Region;
- SigV4Credentials m_Credentials;
- Ref<ImdsCredentialProvider> m_CredentialProvider;
-};
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ class WorkData
+ {
+ public:
+ WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate)
+ {
+ PrepareFileForScatteredWrite(m_DestFile.Handle(), Size);
+ }
+ ~WorkData() { m_DestFile.Flush(); }
+ void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); }
-void
-S3Hydrator::Configure(const HydrationConfig& Config)
-{
- m_Config = Config;
+ private:
+ BasicFile m_DestFile;
+ };
- std::string_view Spec = m_Config.TargetSpecification;
- Spec.remove_prefix(S3HydratorPrefix.size());
+ std::shared_ptr<WorkData> Data = std::make_shared<WorkData>(DestinationPath, Size);
- size_t SlashPos = Spec.find('/');
- std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
- m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
- m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId;
+ uint64_t Offset = 0;
+ while (Offset < Size)
+ {
+ uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
- ZEN_ASSERT(!m_Bucket.empty());
+ Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
+ }
- std::string Region = GetEnvVariable("AWS_DEFAULT_REGION");
- if (Region.empty())
- {
- Region = GetEnvVariable("AWS_REGION");
- }
- if (Region.empty())
- {
- Region = "us-east-1";
+ Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
+ });
+ Offset += ChunkSize;
+ }
+ }
+ else
+ {
+ Work.ScheduleWork(WorkerPool,
+ [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ }
+
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestinationPath, Ec);
+ if (Ec)
+ {
+ Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
+ Chunk.Content.SetDeleteOnClose(true);
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ }
+ }
+ else
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ });
+ }
}
- m_Region = std::move(Region);
- std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
- if (AccessKeyId.empty())
- {
- m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
- }
- else
+ void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash)
{
- m_Credentials.AccessKeyId = std::move(AccessKeyId);
- m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
- m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ S3Result Result = m_Client.Touch(Key);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
+ }
+ });
}
-}
-S3Client
-S3Hydrator::CreateS3Client() const
-{
- S3ClientOptions Options;
- Options.BucketName = m_Bucket;
- Options.Region = m_Region;
-
- if (!m_Config.S3Endpoint.empty())
+ void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
{
- Options.Endpoint = m_Config.S3Endpoint;
- Options.PathStyle = m_Config.S3PathStyle;
+ std::string ModulePrefix = m_KeyPrefix + "/";
+ S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error);
+ }
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Result DelResult = m_Client.DeleteObject(Key);
+ if (!DelResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ }
+ });
+ }
}
- if (m_CredentialProvider)
- {
- Options.CredentialProvider = m_CredentialProvider;
- }
- else
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator: the only HydrationStrategyBase implementation.
+ // Holds a per-module StorageBase and threading context; drives the
+ // hydrate/dehydrate algorithm.
+
+ class IncrementalHydrator : public HydrationStrategyBase
{
- Options.Credentials = m_Credentials;
- }
+ public:
+ IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage);
+ virtual ~IncrementalHydrator() override;
- return S3Client(Options);
-}
+ virtual void Dehydrate(const CbObject& CachedState) override;
+ virtual CbObject Hydrate() override;
+ virtual void Obliterate() override;
-std::string
-S3Hydrator::BuildTimestampFolderName() const
-{
- UtcTime Now = UtcTime::Now();
- return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-}
+ private:
+ struct Entry
+ {
+ std::filesystem::path RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ };
-std::string
-S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const
-{
- return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string();
-}
+ std::unique_ptr<StorageBase> m_Storage;
+ HydrationConfig m_Config;
+ WorkerThreadPool m_FallbackWorkPool;
+ std::atomic<bool> m_FallbackAbortFlag{false};
+ std::atomic<bool> m_FallbackPauseFlag{false};
+ HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool,
+ .AbortFlag = &m_FallbackAbortFlag,
+ .PauseFlag = &m_FallbackPauseFlag};
+ };
-void
-S3Hydrator::Dehydrate()
-{
- ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix);
+ ///////////////////////////////////////////////////////////////////////
+ // IncrementalHydrator implementations
- try
+ IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage)
+ : m_Storage(std::move(Storage))
+ , m_Config(Config)
+ , m_FallbackWorkPool(0)
{
- S3Client Client = CreateS3Client();
- std::string FolderName = BuildTimestampFolderName();
- uint64_t TotalBytes = 0;
- uint32_t FileCount = 0;
- std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now();
+ if (Config.Threading)
+ {
+ m_Threading = *Config.Threading;
+ }
+ }
- DirectoryContent DirContent;
- GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent);
+ IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); }
+
+ void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
+ {
+ Stopwatch Timer;
- for (const std::filesystem::path& AbsPath : DirContent.Files)
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ try
{
- std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir);
- if (RelPath.empty() || *RelPath.begin() == "..")
+ std::unordered_map<std::string, size_t> StateEntryLookup;
+ std::vector<Entry> StateEntries;
+ for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
{
- throw zen::runtime_error(
- "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - "
- "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)",
- AbsPath.string(),
- m_Config.ServerStateDir.string());
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::filesystem::path RelativePath(EntryView["Path"].AsString());
+ uint64_t Size = EntryView["Size"].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"].AsUInt64();
+ IoHash Hash = EntryView["Hash"].AsHash();
+
+ StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
+ StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
}
- std::string Key = MakeObjectKey(FolderName, RelPath);
- BasicFile File(AbsPath, BasicFile::Mode::kRead);
- uint64_t FileSize = File.FileSize();
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DirContent.Files.size(),
+ NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+
+ std::vector<Entry> Entries;
+ Entries.resize(DirContent.Files.size());
+
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ uint64_t HashedFiles = 0;
+ uint64_t HashedBytes = 0;
+
+ std::unordered_set<IoHash> ExistsLookup;
- S3Result UploadResult =
- Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); });
- if (!UploadResult.IsSuccess())
{
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error);
- }
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- TotalBytes += FileSize;
- ++FileCount;
- }
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+ if (AbsPath.filename() == "reserve.gc")
+ {
+ continue;
+ }
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ if (*RelativePath.begin() == ".sentry-native")
+ {
+ continue;
+ }
+ if (RelativePath == ".lock")
+ {
+ continue;
+ }
- // Write current-state.json
- int64_t UploadDurationMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count();
-
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "FolderName" << FolderName;
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadDurationMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "FileCount" << FileCount;
-
- ExtendableStringBuilder<1024> JsonBuilder;
- Meta.Save().ToJson(JsonBuilder);
-
- std::string MetaKey = m_KeyPrefix + "/current-state.json";
- std::string_view JsonText = JsonBuilder.ToView();
- IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size());
- S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf));
- if (!MetaUploadResult.IsSuccess())
- {
- throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error);
- }
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = RelativePath;
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
- ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs);
- }
- catch (std::exception& Ex)
- {
- // Any in-progress multipart upload has already been aborted by PutObjectMultipart.
- // current-state.json is only written on success, so the previous S3 state remains valid.
- ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what());
- }
-}
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ {
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
+ }
+ }
-void
-S3Hydrator::Hydrate()
-{
- ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir);
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(*m_Threading.WorkerPool,
+ [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ Entry& CurrentEntry = Entries[EntryIndex];
+
+ bool FoundHash = false;
+ if (AbsPath.extension().empty())
+ {
+ auto It = CurrentEntry.RelativePath.begin();
+ if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
+ RawHash,
+ RawSize);
+ if (Compressed)
+ {
+ // We compose a meta-hash since taking the RawHash might collide with an
+ // existing non-compressed file with the same content The collision is
+ // unlikely except if the compressed data is zero bytes causing RawHash
+ // to be the same as an empty file.
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
+ CurrentEntry.Hash = Hasher.GetHash();
+ FoundHash = true;
+ }
+ }
+ }
+
+ if (!FoundHash)
+ {
+ CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
+ }
+ });
+ HashedFiles++;
+ HashedBytes += CurrentEntry.Size;
+ }
+ TotalFiles++;
+ TotalBytes += CurrentEntry.Size;
+ }
- const bool ForceRemoveReadOnlyFiles = true;
+ std::vector<IoHash> ExistingEntries = m_Storage->List();
+ ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
- // Clean temp dir before starting in case of leftover state from a previous failed hydration
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ Work.Wait();
- bool WipeServerState = false;
+ Entries.resize(TotalFiles);
+ }
- try
- {
- S3Client Client = CreateS3Client();
- std::string MetaKey = m_KeyPrefix + "/current-state.json";
+ uint64_t UploadedFiles = 0;
+ uint64_t UploadedBytes = 0;
+ uint64_t TouchedFiles = 0;
+ uint64_t TouchedBytes = 0;
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
- S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey);
- if (HeadResult.Status == HeadObjectResult::NotFound)
- {
- throw zen::runtime_error("No state found in S3 at '{}'", MetaKey);
- }
- if (!HeadResult.IsSuccess())
- {
- throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error);
- }
+ for (const Entry& CurrentEntry : Entries)
+ {
+ if (!ExistsLookup.contains(CurrentEntry.Hash))
+ {
+ m_Storage->Put(Work,
+ *m_Threading.WorkerPool,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
+ UploadedFiles++;
+ UploadedBytes += CurrentEntry.Size;
+ }
+ else
+ {
+ // Refresh the backend's modification time so lifecycle-expiration policies
+ // do not evict CAS entries that are still referenced by this module.
+ m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
+ TouchedFiles++;
+ TouchedBytes += CurrentEntry.Size;
+ }
+ }
- S3GetObjectResult MetaResult = Client.GetObject(MetaKey);
- if (!MetaResult.IsSuccess())
- {
- throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error);
- }
+ Work.Wait();
+ uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SourceFolder" << ServerStateDir.generic_string();
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadTimeMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "StorageSettings" << m_Storage->GetSettings();
+
+ Meta.BeginArray("Files");
+ for (const Entry& CurrentEntry : Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Path" << CurrentEntry.RelativePath.generic_string();
+ Meta << "Size" << CurrentEntry.Size;
+ Meta << "ModTick" << CurrentEntry.ModTick;
+ Meta << "Hash" << CurrentEntry.Hash;
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
- std::string ParseError;
- json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError);
- if (!ParseError.empty())
- {
- throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError);
- }
+ m_Storage->SaveMetadata(Meta.Save());
+ }
- std::string FolderName = MetaJson["FolderName"].string_value();
- if (FolderName.empty())
- {
- throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey);
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+
+ ZEN_INFO(
+ "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) "
+ "in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ HashedFiles,
+ NiceBytes(HashedBytes),
+ UploadedFiles,
+ NiceBytes(UploadedBytes),
+ TouchedFiles,
+ NiceBytes(TouchedBytes),
+ TotalFiles,
+ NiceBytes(TotalBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
-
- std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/";
- S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix);
- if (!ListResult.IsSuccess())
+ catch (const std::exception& Ex)
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error);
+ ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
}
+ }
- for (const S3ObjectInfo& Obj : ListResult.Objects)
+ CbObject IncrementalHydrator::Hydrate()
+ {
+ Stopwatch Timer;
+
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ try
{
- if (!Obj.Key.starts_with(FolderPrefix))
+ CbObject Meta = m_Storage->LoadMetadata();
+ if (!Meta)
{
- ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix);
- continue;
+ ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ return CbObject();
}
- std::string RelKey = Obj.Key.substr(FolderPrefix.size());
- if (RelKey.empty())
+ std::unordered_map<std::string, size_t> EntryLookup;
+ std::vector<Entry> Entries;
+ uint64_t TotalSize = 0;
+
+ for (CbFieldView FieldView : Meta["Files"])
{
- continue;
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
+ {
+ Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
+ .Size = EntryView["Size"].AsUInt64(),
+ .ModTick = EntryView["ModTick"].AsUInt64(),
+ .Hash = EntryView["Hash"].AsHash()};
+ TotalSize += NewEntry.Size;
+ EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
+ Entries.emplace_back(std::move(NewEntry));
+ }
}
- std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey));
- CreateDirectories(DestPath.parent_path());
- BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
- DestFile.SetFileSize(Obj.Size);
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ Entries.size(),
+ NiceBytes(TotalSize));
+
+ m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
+
+ uint64_t DownloadedBytes = 0;
+ uint64_t DownloadedFiles = 0;
- if (Obj.Size > 0)
{
- BasicFileWriter Writer(DestFile, 64 * 1024);
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- uint64_t Offset = 0;
- while (Offset < Obj.Size)
+ for (const Entry& CurrentEntry : Entries)
{
- uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset);
- S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize);
- if (!Chunk.IsSuccess())
+ std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
+ CreateDirectories(Path.parent_path());
+ m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
+ DownloadedBytes += CurrentEntry.Size;
+ DownloadedFiles++;
+ }
+
+ Work.Wait();
+ }
+
+ // Downloaded successfully - swap into ServerStateDir
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItTmp != TempDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ TempDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
{
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
- Obj.Key,
- Offset,
- Offset + ChunkSize - 1,
- Chunk.Error);
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ }
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
}
-
- Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
- Offset += ChunkSize;
}
- Writer.Flush();
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
}
- }
-
- // Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] =
- std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end());
- if (ItTmp != m_Config.TempDir.begin())
- {
- // Fast path: atomic renames - no data copying needed
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir))
+ // TODO: This could perhaps be done more efficently, but ok for now
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files");
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename());
- if (Entry.is_directory())
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+
+ if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
{
- RenameDirectory(Entry.path(), Dest);
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path" << RelativePath.generic_string();
+ HydrateState << "Size" << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash" << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
}
else
{
- RenameFile(Entry.path(), Dest);
+ ZEN_ASSERT(false);
}
}
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ HydrateState.EndArray();
+
+ CbObject StateObject = HydrateState.Save();
+
+ ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
+ m_Config.ModuleId,
+ m_Config.ServerStateDir,
+ DownloadedFiles,
+ NiceBytes(DownloadedBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return StateObject;
}
- else
+ catch (const std::exception& Ex)
{
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true});
+ ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'",
+ m_Config.ModuleId,
+ Ex.what(),
+ m_Config.ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ return {};
}
-
- ZEN_INFO("Hydration complete from folder '{}'", FolderName);
}
- catch (std::exception& Ex)
+
+ void IncrementalHydrator::Obliterate()
{
- ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what());
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+
+ try
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ m_Storage->Delete(Work, *m_Threading.WorkerPool);
+ Work.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ }
- // We don't do the clean right here to avoid potentially running into double-throws
- WipeServerState = true;
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
}
- if (WipeServerState)
+} // namespace hydration_impl
+
+///////////////////////////////////////////////////////////////////////////
+// HydrationBase subclasses - own hub-wide backend state, hand per-module
+// storages the exact inputs they need in CreateHydrator.
+
+class FileHydration : public HydrationBase
+{
+public:
+ explicit FileHydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::filesystem::path m_StorageRoot;
+};
+
+class S3Hydration : public HydrationBase
+{
+public:
+ explicit S3Hydration(const Configuration& Config);
+
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) override;
+
+private:
+ std::string m_Bucket;
+ std::string m_Region;
+ std::string m_Endpoint;
+ bool m_PathStyle = false;
+ std::string m_KeyPrefixRoot;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+ std::unique_ptr<S3Client> m_Client;
+ uint64_t m_DefaultMultipartChunkSize;
+};
+
+///////////////////////////////////////////////////////////////////////////
+// Implementations
+
+FileHydration::FileHydration(const Configuration& Config)
+{
+ if (!Config.TargetSpecification.empty())
+ {
+ m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length()));
+ if (m_StorageRoot.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ }
+ }
+ else
{
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ m_StorageRoot = Utf8ToWide(std::string(Path));
}
+ MakeSafeAbsolutePathInPlace(m_StorageRoot);
}
std::unique_ptr<HydrationStrategyBase>
-CreateHydrator(const HydrationConfig& Config)
+FileHydration::CreateHydrator(const HydrationConfig& Config)
{
- if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
+ using namespace hydration_impl;
+ return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId));
+}
+
+S3Hydration::S3Hydration(const Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ std::string_view Spec;
+ if (!Config.TargetSpecification.empty())
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
+ Spec = Config.TargetSpecification;
+ Spec.remove_prefix(S3Storage::Prefix.size());
}
- if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ else
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
- Hydrator->Configure(Config);
- return Hydrator;
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
+ {
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
+ }
+ Spec = Uri;
+ Spec.remove_prefix(S3Storage::Prefix.size());
}
- throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
-}
-#if ZEN_WITH_TESTS
+ size_t SlashPos = Spec.find('/');
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
-namespace {
+ if (m_Bucket.empty())
+ {
+ throw zen::runtime_error("Incremental S3 hydration config requires a bucket name");
+ }
+
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
+
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
+ {
+ m_Endpoint = std::string(Endpoint);
+ m_PathStyle = Settings["path-style"].AsBool();
+ }
+
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+
+ m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
- /// Scoped RAII helper to set/restore a single environment variable within a test.
- /// Used to configure AWS credentials for each S3 test's MinIO instance
- /// without polluting the global environment.
- struct ScopedEnvVar
+ S3ClientOptions ClientOptions;
+ ClientOptions.BucketName = m_Bucket;
+ ClientOptions.Region = m_Region;
+ ClientOptions.Endpoint = m_Endpoint;
+ ClientOptions.PathStyle = m_PathStyle;
+ if (m_CredentialProvider)
{
- std::string m_Name;
- std::optional<std::string> m_OldValue; // nullopt = was not set; "" = was set to empty string
+ ClientOptions.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ ClientOptions.Credentials = m_Credentials;
+ }
+ ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
+ m_Client = std::make_unique<S3Client>(ClientOptions);
+}
+
+std::unique_ptr<HydrationStrategyBase>
+S3Hydration::CreateHydrator(const HydrationConfig& Config)
+{
+ using namespace hydration_impl;
+ std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId);
+ return std::make_unique<IncrementalHydrator>(
+ Config,
+ std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize));
+}
- ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name)
+std::unique_ptr<HydrationBase>
+InitHydration(const HydrationBase::Configuration& Config)
+{
+ using namespace hydration_impl;
+
+ if (!Config.TargetSpecification.empty())
+ {
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0)
{
-# if ZEN_PLATFORM_WINDOWS
- // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND)
- // from "set to empty string" (returns 0 with no error).
- char Buf[1];
- DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf));
- if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND)
- {
- m_OldValue = std::nullopt;
- }
- else
- {
- // Len == 0 with no error: variable exists but is empty.
- // Len > sizeof(Buf): value is non-empty; Len is the required buffer size
- // (including null terminator) - allocate and re-read.
- std::string Old(Len == 0 ? 0 : Len - 1, '\0');
- if (Len > sizeof(Buf))
- {
- GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len);
- }
- m_OldValue = std::move(Old);
- }
- SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str());
-# else
- // getenv returns nullptr when not set, "" when set to empty string.
- const char* Existing = getenv(m_Name.c_str());
- m_OldValue = Existing ? std::optional<std::string>(Existing) : std::nullopt;
- setenv(m_Name.c_str(), std::string(Value).c_str(), 1);
-# endif
+ return std::make_unique<FileHydration>(Config);
}
- ~ScopedEnvVar()
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0)
{
-# if ZEN_PLATFORM_WINDOWS
- SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr);
-# else
- if (m_OldValue.has_value())
- {
- setenv(m_Name.c_str(), m_OldValue->c_str(), 1);
- }
- else
- {
- unsetenv(m_Name.c_str());
- }
-# endif
+ return std::make_unique<S3Hydration>(Config);
}
+ throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification);
+ }
+
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileStorage::Type)
+ {
+ return std::make_unique<FileHydration>(Config);
+ }
+ if (Type == S3Storage::Type)
+ {
+ return std::make_unique<S3Hydration>(Config);
+ }
+ if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ throw zen::runtime_error("No hydration target configured");
+}
+
+#if ZEN_WITH_TESTS
+
+namespace {
+
+ struct TestThreading
+ {
+ WorkerThreadPool WorkerPool;
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag};
+
+ explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {}
};
/// Create a small file hierarchy under BaseDir:
@@ -593,10 +1222,10 @@ namespace {
/// subdir/file_b.bin
/// subdir/nested/file_c.bin
/// Returns a vector of (relative path, content) pairs for later verification.
- std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir)
- {
- std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ typedef std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFileList;
+ TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files)
+ {
auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) {
std::filesystem::path FullPath = BaseDir / RelPath;
CreateDirectories(FullPath.parent_path());
@@ -607,6 +1236,36 @@ namespace {
AddFile("file_a.bin", CreateSemiRandomBlob(1024));
AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048));
AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512));
+
+ return Files;
+ }
+
+ TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir)
+ {
+ TestFileList Files;
+ AddTestFiles(BaseDir, Files);
+ return Files;
+ }
+
+ TestFileList CreateTestTree(const std::filesystem::path& BaseDir)
+ {
+ TestFileList Files;
+ AddTestFiles(BaseDir, Files);
+
+ auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) {
+ std::filesystem::path FullPath = BaseDir / RelPath;
+ CreateDirectories(FullPath.parent_path());
+ WriteFile(FullPath, Content);
+ Files.emplace_back(std::move(RelPath), std::move(Content));
+ };
+
+ AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u));
+ AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u));
+ AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u));
+ AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u));
return Files;
}
@@ -644,35 +1303,27 @@ TEST_CASE("hydration.file.dehydrate_hydrate")
CreateDirectories(HydrationTemp);
const std::string ModuleId = "testmodule";
- auto TestFiles = CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
+
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
// Dehydrate: copy server state to file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// Verify the module folder exists in the store and ServerStateDir was wiped
CHECK(std::filesystem::exists(HydrationStore / ModuleId));
CHECK(std::filesystem::is_empty(ServerStateDir));
// Hydrate: restore server state from file store
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
// Verify restored contents match the original
VerifyTree(ServerStateDir, TestFiles);
}
-TEST_CASE("hydration.file.dehydrate_cleans_server_state")
+TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
{
ScopedTemporaryDirectory TempDir;
@@ -683,22 +1334,25 @@ TEST_CASE("hydration.file.dehydrate_cleans_server_state")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"};
- // FileHydrator::Dehydrate() must wipe ServerStateDir when done
- CHECK(std::filesystem::is_empty(ServerStateDir));
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ // Put a stale file in ServerStateDir to simulate leftover state
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+
+ // Hydrate - must wipe stale file and restore original
+ Hydration->CreateHydrator(Config)->Hydrate();
+
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
+ VerifyTree(ServerStateDir, TestFiles);
}
-TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+TEST_CASE("hydration.file.excluded_files_not_dehydrated")
{
ScopedTemporaryDirectory TempDir;
@@ -709,31 +1363,70 @@ TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- auto TestFiles = CreateTestTree(ServerStateDir);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "testmodule";
- Config.TargetSpecification = "file://" + HydrationStore.string();
+ // Add files that the dehydrator should skip
+ WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64));
+ CreateDirectories(ServerStateDir / ".sentry-native");
+ WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
+ WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
- // Dehydrate the original state
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
- // Put a stale file in ServerStateDir to simulate leftover state
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"};
- // Hydrate - must wipe stale file and restore original
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
+ // Hydrate into a clean directory
+ CleanDirectory(ServerStateDir, true);
+ Hydration->CreateHydrator(Config)->Hydrate();
+
+ // Normal files must be restored
VerifyTree(ServerStateDir, TestFiles);
+ // Excluded files must NOT be restored
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc"));
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native"));
+}
+
+// ---------------------------------------------------------------------------
+// FileHydrator obliterate test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.obliterate")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "obliterate_test";
+ CreateSmallTestTree(ServerStateDir);
+
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+
+ // Dehydrate so the backend store has data
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::exists(HydrationStore / ModuleId));
+
+ // Put some files back in ServerStateDir and TempDir to verify cleanup
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+
+ // Obliterate
+ Hydration->CreateHydrator(Config)->Obliterate();
+
+ // Backend store directory deleted
+ CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
+ // ServerStateDir cleaned
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ // TempDir cleaned
+ CHECK(std::filesystem::is_empty(HydrationTemp));
}
// ---------------------------------------------------------------------------
@@ -750,6 +1443,8 @@ TEST_CASE("hydration.file.concurrent")
std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
CreateDirectories(HydrationStore);
+ TestThreading Threading(8);
+
struct ModuleData
{
HydrationConfig Config;
@@ -757,6 +1452,8 @@ TEST_CASE("hydration.file.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("file_concurrent_{}", I);
@@ -765,11 +1462,11 @@ TEST_CASE("hydration.file.concurrent")
CreateDirectories(StateDir);
CreateDirectories(TempPath);
- Modules[I].Config.ServerStateDir = StateDir;
- Modules[I].Config.TempDir = TempPath;
- Modules[I].Config.ModuleId = ModuleId;
- Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string();
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateSmallTestTree(StateDir);
}
// Concurrent dehydrate
@@ -781,9 +1478,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -799,9 +1495,8 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -818,14 +1513,14 @@ TEST_CASE("hydration.file.concurrent")
// ---------------------------------------------------------------------------
// S3Hydrator tests
//
-// Each test case spawns its own local MinIO instance (self-contained, no external setup needed).
+// Each test case spawns a local MinIO instance (self-contained, no external setup needed).
// The MinIO binary must be present in the same directory as the test executable (copied by xmake).
// ---------------------------------------------------------------------------
TEST_CASE("hydration.s3.dehydrate_hydrate")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19010;
+ MinioOpts.Port = 19011;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -840,168 +1535,45 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_roundtrip";
- auto TestFiles = CreateTestTree(ServerStateDir);
-
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
-
- // Dehydrate: upload server state to MinIO
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
-
- // Wipe server state
- CleanDirectory(ServerStateDir, true);
- CHECK(std::filesystem::is_empty(ServerStateDir));
-
- // Hydrate: download from MinIO back to server state
+ HydrationBase::Configuration BaseConfig;
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- // Verify restored contents match the original
- VerifyTree(ServerStateDir, TestFiles);
-}
-
-TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
-{
- // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites
- // current-state.json to point at that folder. Old folders are NOT deleted.
- // Hydrate() must read current-state.json to determine which folder to restore from.
- //
- // This test verifies that:
- // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first,
- // confirming that current-state.json was updated between dehydrations.
- // 2. current-state.json is updated to point at the second (latest) folder.
- // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot.
-
- MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19011;
- MinioProcess Minio(MinioOpts);
- Minio.SpawnMinioServer();
- Minio.CreateBucket("zen-hydration-test");
-
- ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
- ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
-
- ScopedTemporaryDirectory TempDir;
-
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationTemp);
-
- const std::string ModuleId = "s3test_folder_select";
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"};
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir
+ // with a stale file to confirm the cleanup branch wipes it.
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK(std::filesystem::is_empty(ServerStateDir));
// v1: dehydrate without a marker file
- CreateTestTree(ServerStateDir);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
-
- // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later
- // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin).
- Sleep(100);
+ CreateSmallTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
// v2: dehydrate WITH a marker file that only v2 has
- CreateTestTree(ServerStateDir);
+ CreateSmallTestTree(ServerStateDir);
WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
- }
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- // Hydrate must restore v2 (current-state.json points to the v2 folder)
+ // Hydrate must restore v2 (the latest dehydrated state)
CleanDirectory(ServerStateDir, true);
- {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
- }
+ Hydration->CreateHydrator(Config)->Hydrate();
- // v2 marker must be present - confirms current-state.json pointed to the v2 folder
+ // v2 marker must be present - confirms the second dehydration overwrote the first
CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
- // Subdirectory hierarchy must also be intact
CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin"));
CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin"));
}
-TEST_CASE("hydration.s3.module_isolation")
-{
- // Two independent modules dehydrate/hydrate without interfering with each other.
- // Uses VerifyTree with per-module byte content to detect cross-module data mixing.
- MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19012;
- MinioProcess Minio(MinioOpts);
- Minio.SpawnMinioServer();
- Minio.CreateBucket("zen-hydration-test");
-
- ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
- ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
-
- ScopedTemporaryDirectory TempDir;
-
- struct ModuleData
- {
- HydrationConfig Config;
- std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
- };
-
- std::vector<ModuleData> Modules;
- for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"})
- {
- std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
- std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
- CreateDirectories(StateDir);
- CreateDirectories(TempPath);
-
- ModuleData Data;
- Data.Config.ServerStateDir = StateDir;
- Data.Config.TempDir = TempPath;
- Data.Config.ModuleId = ModuleId;
- Data.Config.TargetSpecification = "s3://zen-hydration-test";
- Data.Config.S3Endpoint = Minio.Endpoint();
- Data.Config.S3PathStyle = true;
- Data.Files = CreateTestTree(StateDir);
-
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config);
- Hydrator->Dehydrate();
-
- Modules.push_back(std::move(Data));
- }
-
- for (ModuleData& Module : Modules)
- {
- CleanDirectory(Module.Config.ServerStateDir, true);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config);
- Hydrator->Hydrate();
-
- // Each module's files must be independently restorable with correct byte content.
- // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ.
- VerifyTree(Module.Config.ServerStateDir, Module.Files);
- }
-}
-
-// ---------------------------------------------------------------------------
-// S3Hydrator concurrent test
-// ---------------------------------------------------------------------------
-
TEST_CASE("hydration.s3.concurrent")
{
// N modules dehydrate and hydrate concurrently against MinIO.
@@ -1015,7 +1587,10 @@ TEST_CASE("hydration.s3.concurrent")
ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
- constexpr int kModuleCount = 4;
+ constexpr int kModuleCount = 6;
+ constexpr int kThreadCount = 4;
+
+ TestThreading Threading(kThreadCount);
ScopedTemporaryDirectory TempDir;
@@ -1026,6 +1601,18 @@ TEST_CASE("hydration.s3.concurrent")
};
std::vector<ModuleData> Modules(kModuleCount);
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
for (int I = 0; I < kModuleCount; ++I)
{
std::string ModuleId = fmt::format("s3_concurrent_{}", I);
@@ -1034,27 +1621,24 @@ TEST_CASE("hydration.s3.concurrent")
CreateDirectories(StateDir);
CreateDirectories(TempPath);
- Modules[I].Config.ServerStateDir = StateDir;
- Modules[I].Config.TempDir = TempPath;
- Modules[I].Config.ModuleId = ModuleId;
- Modules[I].Config.TargetSpecification = "s3://zen-hydration-test";
- Modules[I].Config.S3Endpoint = Minio.Endpoint();
- Modules[I].Config.S3PathStyle = true;
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateTestTree(StateDir);
}
// Concurrent dehydrate
{
- WorkerThreadPool Pool(kModuleCount, "hydration_s3_dehy");
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
});
}
Work.Wait();
@@ -1063,17 +1647,16 @@ TEST_CASE("hydration.s3.concurrent")
// Concurrent hydrate
{
- WorkerThreadPool Pool(kModuleCount, "hydration_s3_hy");
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (int I = 0; I < kModuleCount; ++I)
{
- Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
CleanDirectory(Config.ServerStateDir, true);
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ Hydration->CreateHydrator(Config)->Hydrate();
});
}
Work.Wait();
@@ -1087,17 +1670,76 @@ TEST_CASE("hydration.s3.concurrent")
}
}
-// ---------------------------------------------------------------------------
-// S3Hydrator: no prior state (first-boot path)
-// ---------------------------------------------------------------------------
+TEST_CASE("hydration.s3.obliterate")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19019;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
-TEST_CASE("hydration.s3.no_prior_state")
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "s3test_obliterate";
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+
+ // Dehydrate to populate backend
+ CreateSmallTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ auto ListModuleObjects = [&]() {
+ S3ClientOptions Opts;
+ Opts.BucketName = "zen-hydration-test";
+ Opts.Endpoint = Minio.Endpoint();
+ Opts.PathStyle = true;
+ Opts.Credentials.AccessKeyId = Minio.RootUser();
+ Opts.Credentials.SecretAccessKey = Minio.RootPassword();
+ S3Client Client(Opts);
+ return Client.ListObjects(ModuleId + "/");
+ };
+
+ // Verify objects exist in S3
+ CHECK(!ListModuleObjects().Objects.empty());
+
+ // Re-populate ServerStateDir and TempDir for cleanup verification
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+
+ // Obliterate
+ Hydration->CreateHydrator(Config)->Obliterate();
+
+ // Verify S3 objects deleted
+ CHECK(ListModuleObjects().Objects.empty());
+ // Local directories cleaned
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ CHECK(std::filesystem::is_empty(HydrationTemp));
+}
+
+TEST_CASE("hydration.s3.config_overrides")
{
- // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty.
- // The "No state found in S3" path goes through the error-cleanup branch, which wipes
- // ServerStateDir to ensure no partial or stale content is left for the server to start on.
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19014;
+ MinioOpts.Port = 19015;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -1112,36 +1754,202 @@ TEST_CASE("hydration.s3.no_prior_state")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- // Pre-populate ServerStateDir to confirm the wipe actually runs.
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ // Path prefix: "s3://bucket/some/prefix" stores objects under
+ // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...".
+ {
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"};
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_no_prior";
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ CleanDirectory(ServerStateDir, true);
+
+ Hydration->CreateHydrator(Config)->Hydrate();
+
+ VerifyTree(ServerStateDir, TestFiles);
+ }
+
+ // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION.
+ // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options.
+ {
+ CleanDirectory(ServerStateDir, true);
+ auto TestFiles = CreateSmallTestTree(ServerStateDir);
- // ServerStateDir must be empty: the error path wipes it to prevent a server start
- // against stale or partially-installed content.
+ ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"};
+
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ CleanDirectory(ServerStateDir, true);
+
+ Hydration->CreateHydrator(Config)->Hydrate();
+
+ VerifyTree(ServerStateDir, TestFiles);
+ }
+}
+
+TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19010;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "s3test_performance";
+ CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true});
+ // auto TestFiles = CreateTestTree(ServerStateDir);
+
+ TestThreading Threading(4);
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
+
+ // Dehydrate: upload server state to MinIO
+ ZEN_INFO("============== DEHYDRATE ==============");
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ for (size_t I = 0; I < 1; I++)
+ {
+ // Wipe server state
+ CleanDirectory(ServerStateDir, true);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: download from MinIO back to server state
+ ZEN_INFO("=============== HYDRATE ===============");
+ Hydration->CreateHydrator(Config)->Hydrate();
+ }
+}
+
+//#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen"
+//#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508"
+
+TEST_CASE("hydration.file.incremental")
+{
+ std::filesystem::path TmpPath;
+# ifdef REAL_DATA_PATH
+ TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub";
+# endif
+ ScopedTemporaryDirectory TempDir(TmpPath);
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "testmodule";
+ // auto TestFiles = CreateTestTree(ServerStateDir);
+
+ TestThreading Threading(4);
+
+ auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
+
+ // Hydrate with no prior state
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
+
+# ifdef REAL_DATA_PATH
+ ZEN_INFO("Writing state data...");
+ CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true});
+ ZEN_INFO("Writing state data complete");
+# else
+ // Create test files and dehydrate
+ auto TestFiles = CreateTestTree(ServerStateDir);
+# endif
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: restore from file store
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif
+ // Dehydrate again with cached state (should skip re-uploading unchanged files)
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+
+ // Replace files and dehydrate
+ TestFiles = CreateTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif // 0
+
+ // Dehydrate, nothing touched - no hashing, no upload
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
}
// ---------------------------------------------------------------------------
-// S3Hydrator: bucket path prefix in TargetSpecification
+// S3Storage test
// ---------------------------------------------------------------------------
-TEST_CASE("hydration.s3.path_prefix")
+TEST_CASE("hydration.s3.incremental")
{
- // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under
- // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...".
- // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure().
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19015;
+ MinioOpts.Port = 19017;
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -1149,36 +1957,95 @@ TEST_CASE("hydration.s3.path_prefix")
ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
- ScopedTemporaryDirectory TempDir;
+ std::filesystem::path TmpPath;
+# ifdef REAL_DATA_PATH
+ TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub";
+# endif
+ ScopedTemporaryDirectory TempDir(TmpPath);
std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir);
+ const std::string ModuleId = "s3test_incremental";
- HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_prefix";
- Config.TargetSpecification = "s3://zen-hydration-test/team/project";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ TestThreading Threading(8);
+ HydrationBase::Configuration BaseConfig;
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Dehydrate();
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
}
+ auto Hydration = InitHydration(BaseConfig);
- CleanDirectory(ServerStateDir, true);
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = ModuleId,
+ .Threading = Threading.Options};
+
+ // Hydrate with no prior state
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
+
+# ifdef REAL_DATA_PATH
+ ZEN_INFO("Writing state data...");
+ CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true});
+ ZEN_INFO("Writing state data complete");
+# else
+ // Create test files and dehydrate
+ auto TestFiles = CreateTestTree(ServerStateDir);
+# endif
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: restore from S3
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif
+ // Dehydrate again with cached state (should skip re-uploading unchanged files)
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+
+ // Replace files and dehydrate
+ TestFiles = CreateTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+
+ // Hydrate one more time to confirm second dehydrate produced valid state
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+
+# ifndef REAL_DATA_PATH
+ VerifyTree(ServerStateDir, TestFiles);
+# endif // 0
+
+ // Dehydrate, nothing touched - no hashing, no upload
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+}
+
+TEST_CASE("hydration.create_hydrator_rejects_invalid_config")
+{
+ // Unknown TargetSpecification prefix
+ CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"}));
+
+ // Unknown Options type
{
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
- Hydrator->Hydrate();
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()}));
}
- VerifyTree(ServerStateDir, TestFiles);
+ // Empty Options (no type field)
+ CHECK_THROWS(InitHydration({}));
}
TEST_SUITE_END();
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index d29ffe5c0..0455dda91 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -2,10 +2,18 @@
#pragma once
+#include <zencore/compactbinary.h>
+
+#include <atomic>
#include <filesystem>
+#include <memory>
+#include <optional>
+#include <string>
namespace zen {
+class WorkerThreadPool;
+
struct HydrationConfig
{
// Location of server state to hydrate/dehydrate
@@ -14,14 +22,16 @@ struct HydrationConfig
std::filesystem::path TempDir;
// Module ID of the server state being hydrated/dehydrated
std::string ModuleId;
- // Back-end specific target specification (e.g. S3 bucket, file path, etc)
- std::string TargetSpecification;
-
- // Optional S3 endpoint override (e.g. "http://localhost:9000" for MinIO).
- std::string S3Endpoint;
- // Use path-style S3 URLs (endpoint/bucket/key) instead of virtual-hosted-style
- // (bucket.endpoint/key). Required for MinIO and other non-AWS endpoints.
- bool S3PathStyle = false;
+
+ struct ThreadingOptions
+ {
+ WorkerThreadPool* WorkerPool = nullptr;
+ std::atomic<bool>* AbortFlag = nullptr;
+ std::atomic<bool>* PauseFlag = nullptr;
+ };
+
+ // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
+ std::optional<ThreadingOptions> Threading;
};
/**
@@ -30,18 +40,53 @@ struct HydrationConfig
* An instance of this interface is used to perform hydration OR
* dehydration of server state. It's expected to be used only once
* and not reused.
- *
*/
struct HydrationStrategyBase
{
virtual ~HydrationStrategyBase() = default;
- virtual void Dehydrate() = 0;
- virtual void Hydrate() = 0;
- virtual void Configure(const HydrationConfig& Config) = 0;
+ // Upload server state to the configured target. ServerStateDir is wiped on success.
+ // On failure, ServerStateDir is left intact.
+ virtual void Dehydrate(const CbObject& CachedState) = 0;
+
+ // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate.
+ // On failure, ServerStateDir is wiped and an empty CbObject is returned.
+ virtual CbObject Hydrate() = 0;
+
+ // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir.
+ virtual void Obliterate() = 0;
+};
+
+/**
+ * @brief Hub-wide hydration backend
+ *
+ * Constructed once per hub via InitHydration. Holds the shared connection / client /
+ * credentials state for the configured backend (e.g. a single S3 client and IMDS
+ * credential provider shared by all modules). CreateHydrator produces a ready-to-use
+ * per-module HydrationStrategyBase that references the shared state - no per-module
+ * backend setup cost.
+ */
+class HydrationBase
+{
+public:
+ struct Configuration
+ {
+ // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
+ std::string TargetSpecification;
+ // Full config object (mutually exclusive with TargetSpecification)
+ CbObject Options;
+ };
+
+ virtual ~HydrationBase() = default;
+
+ // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
+ virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
};
-std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config);
+// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
+// Throws zen::runtime_error if the config cannot be resolved to a known backend or if
+// backend-specific validation fails.
+std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config);
#if ZEN_WITH_TESTS
void hydration_forcelink();
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 6b139dbf1..97edc5223 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -11,13 +11,15 @@
namespace zen {
-StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId)
-: m_Config(Config)
+StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId)
+: m_Hydration(Hydration)
+, m_Config(Config)
, m_ModuleId(ModuleId)
, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
{
- m_BaseDir = RunEnvironment.CreateChildDir(ModuleId);
- m_TempDir = Config.HydrationTempPath / ModuleId;
}
StorageServerInstance::~StorageServerInstance()
@@ -31,7 +33,7 @@ StorageServerInstance::SpawnServerProcess()
m_ServerInstance.ResetDeadProcess();
m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath());
- m_ServerInstance.SetDataDir(m_BaseDir);
+ m_ServerInstance.SetDataDir(m_Config.StateDir);
#if ZEN_PLATFORM_WINDOWS
m_ServerInstance.SetJobObject(m_JobObject);
#endif
@@ -50,6 +52,36 @@ StorageServerInstance::SpawnServerProcess()
{
AdditionalOptions << " --config=\"" << MakeSafeAbsolutePath(m_Config.ConfigPath).string() << "\"";
}
+ if (!m_Config.Malloc.empty())
+ {
+ AdditionalOptions << " --malloc=" << m_Config.Malloc;
+ }
+ if (!m_Config.Trace.empty())
+ {
+ AdditionalOptions << " --trace=" << m_Config.Trace;
+ }
+ if (!m_Config.TraceHost.empty())
+ {
+ AdditionalOptions << " --tracehost=" << m_Config.TraceHost;
+ }
+ if (!m_Config.TraceFile.empty())
+ {
+ constexpr std::string_view ModuleIdPattern = "{moduleid}";
+ constexpr std::string_view PortPattern = "{port}";
+
+ std::string ResolvedTraceFile = m_Config.TraceFile;
+ for (size_t Pos = ResolvedTraceFile.find(ModuleIdPattern); Pos != std::string::npos;
+ Pos = ResolvedTraceFile.find(ModuleIdPattern, Pos))
+ {
+ ResolvedTraceFile.replace(Pos, ModuleIdPattern.length(), m_ModuleId);
+ }
+ std::string PortStr = fmt::format("{}", m_Config.BasePort);
+ for (size_t Pos = ResolvedTraceFile.find(PortPattern); Pos != std::string::npos; Pos = ResolvedTraceFile.find(PortPattern, Pos))
+ {
+ ResolvedTraceFile.replace(Pos, PortPattern.length(), PortStr);
+ }
+ AdditionalOptions << " --tracefile=\"" << ResolvedTraceFile << "\"";
+ }
m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView());
ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort);
@@ -57,16 +89,15 @@ StorageServerInstance::SpawnServerProcess()
m_ServerInstance.EnableShutdownOnDestroy();
}
-void
-StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const
+ProcessMetrics
+StorageServerInstance::GetProcessMetrics() const
{
- OutMetrics.MemoryBytes = m_MemoryBytes.load();
- OutMetrics.KernelTimeMs = m_KernelTimeMs.load();
- OutMetrics.UserTimeMs = m_UserTimeMs.load();
- OutMetrics.WorkingSetSize = m_WorkingSetSize.load();
- OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load();
- OutMetrics.PagefileUsage = m_PagefileUsage.load();
- OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load();
+ ProcessMetrics Metrics;
+ if (m_ServerInstance.IsRunning())
+ {
+ zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics);
+ }
+ return Metrics;
}
void
@@ -78,7 +109,7 @@ StorageServerInstance::ProvisionLocked()
return;
}
- ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
+ ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir);
try
{
Hydrate();
@@ -88,7 +119,7 @@ StorageServerInstance::ProvisionLocked()
{
ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during provisioning. Reason: {}",
m_ModuleId,
- m_BaseDir,
+ m_Config.StateDir,
Ex.what());
throw;
}
@@ -118,6 +149,22 @@ StorageServerInstance::DeprovisionLocked()
}
void
+StorageServerInstance::ObliterateLocked()
+{
+ if (m_ServerInstance.IsRunning())
+ {
+ // m_ServerInstance.Shutdown() never throws.
+ m_ServerInstance.Shutdown();
+ }
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
+ Hydrator->Obliterate();
+}
+
+void
StorageServerInstance::HibernateLocked()
{
// Signal server to shut down, but keep data around for later wake
@@ -147,7 +194,10 @@ StorageServerInstance::WakeLocked()
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", m_ModuleId, m_BaseDir, Ex.what());
+ ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}",
+ m_ModuleId,
+ m_Config.StateDir,
+ Ex.what());
throw;
}
}
@@ -155,27 +205,34 @@ StorageServerInstance::WakeLocked()
void
StorageServerInstance::Hydrate()
{
- HydrationConfig Config{.ServerStateDir = m_BaseDir,
- .TempDir = m_TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification};
-
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
-
- Hydrator->Hydrate();
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
+ m_HydrationState = Hydrator->Hydrate();
}
void
StorageServerInstance::Dehydrate()
{
- HydrationConfig Config{.ServerStateDir = m_BaseDir,
- .TempDir = m_TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification};
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration.CreateHydrator(Config);
+ Hydrator->Dehydrate(m_HydrationState);
+}
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+HydrationConfig
+StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+{
+ HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId};
+ if (m_Config.OptionalWorkerPool)
+ {
+ Config.Threading.emplace(
+ HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag});
+ }
- Hydrator->Dehydrate();
+ return Config;
}
StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr)
@@ -249,25 +306,6 @@ StorageServerInstance::SharedLockedPtr::IsRunning() const
return m_Instance->m_ServerInstance.IsRunning();
}
-void
-StorageServerInstance::UpdateMetricsLocked()
-{
- if (m_ServerInstance.IsRunning())
- {
- ProcessMetrics Metrics;
- zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics);
-
- m_MemoryBytes.store(Metrics.MemoryBytes);
- m_KernelTimeMs.store(Metrics.KernelTimeMs);
- m_UserTimeMs.store(Metrics.UserTimeMs);
- m_WorkingSetSize.store(Metrics.WorkingSetSize);
- m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize);
- m_PagefileUsage.store(Metrics.PagefileUsage);
- m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage);
- }
- // TODO: Resource metrics...
-}
-
#if ZEN_WITH_TESTS
void
StorageServerInstance::SharedLockedPtr::TerminateForTesting() const
@@ -363,6 +401,13 @@ StorageServerInstance::ExclusiveLockedPtr::Deprovision()
}
void
+StorageServerInstance::ExclusiveLockedPtr::Obliterate()
+{
+ ZEN_ASSERT(m_Instance != nullptr);
+ m_Instance->ObliterateLocked();
+}
+
+void
StorageServerInstance::ExclusiveLockedPtr::Hibernate()
{
ZEN_ASSERT(m_Instance != nullptr);
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 94c47630c..c5917afc9 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -2,8 +2,9 @@
#pragma once
-#include "resourcemetrics.h"
+#include "hydration.h"
+#include <zencore/compactbinary.h>
#include <zenutil/zenserverprocess.h>
#include <atomic>
@@ -11,6 +12,8 @@
namespace zen {
+class WorkerThreadPool;
+
/**
* Storage Server Instance
*
@@ -24,21 +27,28 @@ public:
struct Configuration
{
uint16_t BasePort;
- std::filesystem::path HydrationTempPath;
- std::string HydrationTargetSpecification;
+ std::filesystem::path StateDir;
+ std::filesystem::path TempDir;
uint32_t HttpThreadCount = 0; // Automatic
int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
+ std::string Malloc;
+ std::string Trace;
+ std::string TraceHost;
+ std::string TraceFile;
+
+ WorkerThreadPool* OptionalWorkerPool = nullptr;
};
- StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId);
+ StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ HydrationBase& Hydration,
+ const Configuration& Config,
+ std::string_view ModuleId);
~StorageServerInstance();
- const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; }
-
inline std::string_view GetModuleId() const { return m_ModuleId; }
inline uint16_t GetBasePort() const { return m_Config.BasePort; }
- void GetProcessMetrics(ProcessMetrics& OutMetrics) const;
+ ProcessMetrics GetProcessMetrics() const;
#if ZEN_PLATFORM_WINDOWS
void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; }
@@ -68,15 +78,10 @@ public:
}
bool IsRunning() const;
- const ResourceMetrics& GetResourceMetrics() const
+ ProcessMetrics GetProcessMetrics() const
{
ZEN_ASSERT(m_Instance);
- return m_Instance->m_ResourceMetrics;
- }
- void UpdateMetrics()
- {
- ZEN_ASSERT(m_Instance);
- return m_Instance->UpdateMetricsLocked();
+ return m_Instance->GetProcessMetrics();
}
#if ZEN_WITH_TESTS
@@ -114,14 +119,9 @@ public:
}
bool IsRunning() const;
- const ResourceMetrics& GetResourceMetrics() const
- {
- ZEN_ASSERT(m_Instance);
- return m_Instance->m_ResourceMetrics;
- }
-
void Provision();
void Deprovision();
+ void Obliterate();
void Hibernate();
void Wake();
@@ -135,29 +135,18 @@ public:
private:
void ProvisionLocked();
void DeprovisionLocked();
+ void ObliterateLocked();
void HibernateLocked();
void WakeLocked();
- void UpdateMetricsLocked();
-
mutable RwLock m_Lock;
+ HydrationBase& m_Hydration;
const Configuration m_Config;
std::string m_ModuleId;
ZenServerInstance m_ServerInstance;
- std::filesystem::path m_BaseDir;
-
- std::filesystem::path m_TempDir;
- ResourceMetrics m_ResourceMetrics;
-
- std::atomic<uint64_t> m_MemoryBytes = 0;
- std::atomic<uint64_t> m_KernelTimeMs = 0;
- std::atomic<uint64_t> m_UserTimeMs = 0;
- std::atomic<uint64_t> m_WorkingSetSize = 0;
- std::atomic<uint64_t> m_PeakWorkingSetSize = 0;
- std::atomic<uint64_t> m_PagefileUsage = 0;
- std::atomic<uint64_t> m_PeakPagefileUsage = 0;
+ CbObject m_HydrationState;
#if ZEN_PLATFORM_WINDOWS
JobObject* m_JobObject = nullptr;
@@ -165,8 +154,9 @@ private:
void SpawnServerProcess();
- void Hydrate();
- void Dehydrate();
+ void Hydrate();
+ void Dehydrate();
+ HydrationConfig MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
friend class SharedLockedPtr;
friend class ExclusiveLockedPtr;
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 314031246..ebc2cf2f1 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -2,21 +2,29 @@
#include "zenhubserver.h"
+#include "config/luaconfig.h"
#include "frontend/frontend.h"
#include "httphubservice.h"
+#include "httpproxyhandler.h"
#include "hub.h"
+#include <zencore/compactbinary.h>
#include <zencore/config.h>
+#include <zencore/except.h>
+#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/intmath.h>
#include <zencore/memory/llm.h>
#include <zencore/memory/memorytrace.h>
#include <zencore/memory/tagtrace.h>
#include <zencore/scopeguard.h>
#include <zencore/sentryintegration.h>
+#include <zencore/system.h>
+#include <zencore/thread.h>
#include <zencore/windows.h>
#include <zenhttp/httpapiservice.h>
#include <zenutil/service.h>
-#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cxxopts.hpp>
@@ -53,12 +61,19 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
"instance-id",
- "Instance ID for use in notifications",
+ "Instance ID for use in notifications (deprecated, use --upstream-notification-instance-id)",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
Options.add_option("hub",
"",
+ "upstream-notification-instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId),
+ "");
+
+ Options.add_option("hub",
+ "",
"consul-endpoint",
"Consul endpoint URL for service registration (empty = disabled)",
cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""),
@@ -88,13 +103,27 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
+ "consul-register-hub",
+ "Register the hub parent service with Consul (instance registration is unaffected)",
+ cxxopts::value<bool>(m_ServerOptions.ConsulRegisterHub)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
"hub-base-port-number",
- "Base port number for provisioned instances",
+ "Base port number for provisioned instances (deprecated, use --hub-instance-base-port-number)",
cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"),
"");
Options.add_option("hub",
"",
+ "hub-instance-base-port-number",
+ "Base port number for provisioned instances",
+ cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber),
+ "");
+
+ Options.add_option("hub",
+ "",
"hub-instance-limit",
"Maximum number of provisioned instances for this hub",
cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"),
@@ -113,6 +142,34 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
+ "hub-instance-malloc",
+ "Select memory allocator for provisioned instances (ansi|stomp|rpmalloc|mimalloc)",
+ cxxopts::value<std::string>(m_ServerOptions.HubInstanceMalloc)->default_value(""),
+ "<allocator>");
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-trace",
+ "Trace channel specification for provisioned instances (e.g. default, cpu,log, memory)",
+ cxxopts::value<std::string>(m_ServerOptions.HubInstanceTrace)->default_value(""),
+ "<channels>");
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-tracehost",
+ "Trace host for provisioned instances",
+ cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceHost)->default_value(""),
+ "<host>");
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-tracefile",
+ "Trace file path for provisioned instances",
+ cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceFile)->default_value(""),
+ "<path>");
+
+ Options.add_option("hub",
+ "",
"hub-instance-http-threads",
"Number of http server connection threads for provisioned instances",
cxxopts::value<unsigned int>(m_ServerOptions.HubInstanceHttpThreadCount),
@@ -131,6 +188,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HubInstanceConfigPath),
"<instance config>");
+ const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-provision-threads",
+ fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount)
+ ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)),
+ "<threads>");
+
Options.add_option("hub",
"",
"hub-hydration-target-spec",
@@ -139,6 +206,24 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HydrationTargetSpecification),
"<hydration-target-spec>");
+ Options.add_option("hub",
+ "",
+ "hub-hydration-target-config",
+ "Path to JSON file specifying the hydration target (mutually exclusive with "
+ "--hub-hydration-target-spec). Supported types: 'file', 's3'.",
+ cxxopts::value(m_ServerOptions.HydrationTargetConfigPath),
+ "<path>");
+
+ const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-threads",
+ fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)),
+ "<threads>");
+
#if ZEN_PLATFORM_WINDOWS
Options.add_option("hub",
"",
@@ -203,12 +288,112 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Request timeout in milliseconds for instance activity check requests",
cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"),
"<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-disk-limit-bytes",
+ "Reject provisioning when used disk bytes exceed this value (0 = no limit).",
+ cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionDiskLimitBytes),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-disk-limit-percent",
+ "Reject provisioning when used disk exceeds this percentage of total disk (0 = no limit).",
+ cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionDiskLimitPercent),
+ "<percent>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-memory-limit-bytes",
+ "Reject provisioning when used memory bytes exceed this value (0 = no limit).",
+ cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionMemoryLimitBytes),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-memory-limit-percent",
+ "Reject provisioning when used memory exceeds this percentage of total RAM (0 = no limit).",
+ cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionMemoryLimitPercent),
+ "<percent>");
}
void
ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
{
- ZEN_UNUSED(Options);
+ using namespace std::literals;
+
+ Options.AddOption("hub.upstreamnotification.endpoint"sv,
+ m_ServerOptions.UpstreamNotificationEndpoint,
+ "upstream-notification-endpoint"sv);
+ Options.AddOption("hub.upstreamnotification.instanceid"sv, m_ServerOptions.InstanceId, "upstream-notification-instance-id"sv);
+
+ Options.AddOption("hub.consul.endpoint"sv, m_ServerOptions.ConsulEndpoint, "consul-endpoint"sv);
+ Options.AddOption("hub.consul.tokenenv"sv, m_ServerOptions.ConsulTokenEnv, "consul-token-env"sv);
+ Options.AddOption("hub.consul.healthintervalseconds"sv,
+ m_ServerOptions.ConsulHealthIntervalSeconds,
+ "consul-health-interval-seconds"sv);
+ Options.AddOption("hub.consul.deregisterafterseconds"sv,
+ m_ServerOptions.ConsulDeregisterAfterSeconds,
+ "consul-deregister-after-seconds"sv);
+ Options.AddOption("hub.consul.registerhub"sv, m_ServerOptions.ConsulRegisterHub, "consul-register-hub"sv);
+
+ Options.AddOption("hub.instance.baseportnumber"sv, m_ServerOptions.HubBasePortNumber, "hub-instance-base-port-number"sv);
+ Options.AddOption("hub.instance.http"sv, m_ServerOptions.HubInstanceHttpClass, "hub-instance-http"sv);
+ Options.AddOption("hub.instance.malloc"sv, m_ServerOptions.HubInstanceMalloc, "hub-instance-malloc"sv);
+ Options.AddOption("hub.instance.trace"sv, m_ServerOptions.HubInstanceTrace, "hub-instance-trace"sv);
+ Options.AddOption("hub.instance.tracehost"sv, m_ServerOptions.HubInstanceTraceHost, "hub-instance-tracehost"sv);
+ Options.AddOption("hub.instance.tracefile"sv, m_ServerOptions.HubInstanceTraceFile, "hub-instance-tracefile"sv);
+ Options.AddOption("hub.instance.httpthreads"sv, m_ServerOptions.HubInstanceHttpThreadCount, "hub-instance-http-threads"sv);
+ Options.AddOption("hub.instance.corelimit"sv, m_ServerOptions.HubInstanceCoreLimit, "hub-instance-corelimit"sv);
+ Options.AddOption("hub.instance.config"sv, m_ServerOptions.HubInstanceConfigPath, "hub-instance-config"sv);
+ Options.AddOption("hub.instance.limits.count"sv, m_ServerOptions.HubInstanceLimit, "hub-instance-limit"sv);
+ Options.AddOption("hub.instance.limits.disklimitbytes"sv,
+ m_ServerOptions.HubProvisionDiskLimitBytes,
+ "hub-provision-disk-limit-bytes"sv);
+ Options.AddOption("hub.instance.limits.disklimitpercent"sv,
+ m_ServerOptions.HubProvisionDiskLimitPercent,
+ "hub-provision-disk-limit-percent"sv);
+ Options.AddOption("hub.instance.limits.memorylimitbytes"sv,
+ m_ServerOptions.HubProvisionMemoryLimitBytes,
+ "hub-provision-memory-limit-bytes"sv);
+ Options.AddOption("hub.instance.limits.memorylimitpercent"sv,
+ m_ServerOptions.HubProvisionMemoryLimitPercent,
+ "hub-provision-memory-limit-percent"sv);
+ Options.AddOption("hub.instance.provisionthreads"sv,
+ m_ServerOptions.HubInstanceProvisionThreadCount,
+ "hub-instance-provision-threads"sv);
+
+ Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv);
+ Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv);
+ Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv);
+
+ Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv);
+ Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv,
+ m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs,
+ "hub-watchdog-cycle-processing-budget-ms"sv);
+ Options.AddOption("hub.watchdog.instancecheckthrottlems"sv,
+ m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs,
+ "hub-watchdog-instance-check-throttle-ms"sv);
+ Options.AddOption("hub.watchdog.provisionedinactivitytimeoutseconds"sv,
+ m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds,
+ "hub-watchdog-provisioned-inactivity-timeout-seconds"sv);
+ Options.AddOption("hub.watchdog.hibernatedinactivitytimeoutseconds"sv,
+ m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds,
+ "hub-watchdog-hibernated-inactivity-timeout-seconds"sv);
+ Options.AddOption("hub.watchdog.inactivitycheckmarginseconds"sv,
+ m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds,
+ "hub-watchdog-inactivity-check-margin-seconds"sv);
+ Options.AddOption("hub.watchdog.activitycheckconnecttimeoutms"sv,
+ m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs,
+ "hub-watchdog-activity-check-connect-timeout-ms"sv);
+ Options.AddOption("hub.watchdog.activitycheckrequesttimeoutms"sv,
+ m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs,
+ "hub-watchdog-activity-check-request-timeout-ms"sv);
+
+#if ZEN_PLATFORM_WINDOWS
+ Options.AddOption("hub.usejobobject"sv, m_ServerOptions.HubUseJobObject, "hub-use-job-object"sv);
+#endif
}
void
@@ -226,6 +411,28 @@ ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
void
ZenHubServerConfigurator::ValidateOptions()
{
+ if (m_ServerOptions.HubProvisionDiskLimitPercent > 100)
+ {
+ throw OptionParseException(
+ fmt::format("'--hub-provision-disk-limit-percent' ({}) must be in range 0..100", m_ServerOptions.HubProvisionDiskLimitPercent),
+ {});
+ }
+ if (m_ServerOptions.HubProvisionMemoryLimitPercent > 100)
+ {
+ throw OptionParseException(fmt::format("'--hub-provision-memory-limit-percent' ({}) must be in range 0..100",
+ m_ServerOptions.HubProvisionMemoryLimitPercent),
+ {});
+ }
+ if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty())
+ {
+ throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {});
+ }
+ if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath))
+ {
+ throw OptionParseException(
+ fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()),
+ {});
+ }
}
///////////////////////////////////////////////////////////////////////////
@@ -247,55 +454,71 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
HubInstanceState NewState)
{
ZEN_UNUSED(PreviousState);
- if (!m_ConsulClient)
- {
- return;
- }
- if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned)
+ if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating)
{
- consul::ServiceRegistrationInfo ServiceInfo{
- .ServiceId = std::string(ModuleId),
- .ServiceName = "zen-storage",
- .Port = Info.Port,
- .HealthEndpoint = "health",
- .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
- std::make_pair("zen-hub", std::string(HubInstanceId)),
- std::make_pair("version", std::string(ZEN_CFG_VERSION))},
- .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning
- ? 0u
- : m_ConsulHealthIntervalSeconds, // Disable health checks while not finished provisioning
- .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning
- ? 0u
- : m_ConsulDeregisterAfterSeconds}; // Disable health checks while not finished provisioning
-
- if (!m_ConsulClient->RegisterService(ServiceInfo))
+ if (Info.Port != 0)
{
- ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId);
- }
- else
- {
- ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'",
- ModuleId,
- Info.Port,
- ServiceInfo.ServiceName);
+ m_Proxy->PrunePort(Info.Port);
}
}
- else if (NewState == HubInstanceState::Unprovisioned)
+
+ if (!m_ConsulClient)
{
- if (!m_ConsulClient->DeregisterService(ModuleId))
- {
- ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway",
- ModuleId,
- Info.Port);
- }
- else
- {
- ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
- }
+ return;
+ }
+
+ switch (NewState)
+ {
+ case HubInstanceState::Provisioning:
+ case HubInstanceState::Waking:
+ case HubInstanceState::Recovering:
+ case HubInstanceState::Provisioned:
+ {
+ const bool IsProvisioned = NewState == HubInstanceState::Provisioned;
+
+ consul::ServiceRegistrationInfo ServiceInfo{
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
+ .Port = Info.Port,
+ .HealthEndpoint = "health",
+ .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
+ std::make_pair("zen-hub", std::string(HubInstanceId)),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION))},
+ .HealthIntervalSeconds = IsProvisioned ? m_ConsulHealthIntervalSeconds : 0u,
+ .DeregisterAfterSeconds = IsProvisioned ? m_ConsulDeregisterAfterSeconds : 0u,
+ .InitialStatus = IsProvisioned ? "passing" : ""};
+
+ m_ConsulClient->RegisterService(ServiceInfo);
+ ZEN_INFO("Submitted Consul registration for storage server instance for module '{}' at port {} as '{}'",
+ ModuleId,
+ Info.Port,
+ ServiceInfo.ServiceName);
+ break;
+ }
+ case HubInstanceState::Deprovisioning:
+ case HubInstanceState::Hibernating:
+ case HubInstanceState::Obliterating:
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Unprovisioned:
+ {
+ // A Consul registration is "live" while the module is in a register-state
+ // (Provisioning / Waking / Recovering / Provisioned). Deregister once when
+ // we leave a register-state into any non-register-state
+ const bool WasRegisteredState =
+ PreviousState == HubInstanceState::Provisioning || PreviousState == HubInstanceState::Waking ||
+ PreviousState == HubInstanceState::Recovering || PreviousState == HubInstanceState::Provisioned;
+ if (WasRegisteredState)
+ {
+ m_ConsulClient->DeregisterService(ModuleId);
+ ZEN_INFO("Submitted Consul deregistration for storage server instance for module '{}' at port {}", ModuleId, Info.Port);
+ }
+ }
+ break;
+ default:
+ break;
}
- // Transitional states (Deprovisioning, Hibernating, Waking, Recovering, Crashed)
- // and Hibernated are intentionally ignored.
}
int
@@ -317,6 +540,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
// the main test range.
ZenServerEnvironment::SetBaseChildId(1000);
+ m_ProvisionWorkerPool =
+ std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision");
+ m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration");
+
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
InitializeState(ServerConfig);
@@ -342,12 +569,18 @@ ZenHubServer::Cleanup()
m_IoRunner.join();
}
- ShutdownServices();
if (m_Http)
{
m_Http->Close();
}
+ ShutdownServices();
+
+ if (m_Proxy)
+ {
+ m_Proxy->Shutdown();
+ }
+
if (m_Hub)
{
m_Hub->Shutdown();
@@ -357,6 +590,7 @@ ZenHubServer::Cleanup()
m_HubService.reset();
m_ApiService.reset();
m_Hub.reset();
+ m_Proxy.reset();
m_ConsulRegistration.reset();
m_ConsulClient.reset();
@@ -373,49 +607,121 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig)
ZEN_UNUSED(ServerConfig);
}
+ResourceMetrics
+ZenHubServer::ResolveLimits(const ZenHubServerConfig& ServerConfig)
+{
+ uint64_t DiskTotal = 0;
+ uint64_t MemoryTotal = 0;
+
+ if (ServerConfig.HubProvisionDiskLimitPercent > 0)
+ {
+ DiskSpace Disk;
+ if (DiskSpaceInfo(ServerConfig.DataDir, Disk))
+ {
+ DiskTotal = Disk.Total;
+ }
+ else
+ {
+ ZEN_WARN("Failed to query disk space for '{}'; disk percent limit will not be applied", ServerConfig.DataDir);
+ }
+ }
+ if (ServerConfig.HubProvisionMemoryLimitPercent > 0)
+ {
+ MemoryTotal = GetSystemMetrics().SystemMemoryMiB * 1024 * 1024;
+ }
+
+ auto Resolve = [](uint64_t Bytes, uint32_t Pct, uint64_t Total) -> uint64_t {
+ const uint64_t PctBytes = Pct > 0 ? (Total * Pct) / 100 : 0;
+ if (Bytes > 0 && PctBytes > 0)
+ {
+ return Min(Bytes, PctBytes);
+ }
+ return Bytes > 0 ? Bytes : PctBytes;
+ };
+
+ return {
+ .DiskUsageBytes = Resolve(ServerConfig.HubProvisionDiskLimitBytes, ServerConfig.HubProvisionDiskLimitPercent, DiskTotal),
+ .MemoryUsageBytes = Resolve(ServerConfig.HubProvisionMemoryLimitBytes, ServerConfig.HubProvisionMemoryLimitPercent, MemoryTotal),
+ };
+}
+
void
ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
{
ZEN_INFO("instantiating Hub");
+ Hub::Configuration HubConfig{
+ .UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit,
+ .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
+ .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
+ .InstanceMalloc = ServerConfig.HubInstanceMalloc,
+ .InstanceTrace = ServerConfig.HubInstanceTrace,
+ .InstanceTraceHost = ServerConfig.HubInstanceTraceHost,
+ .InstanceTraceFile = ServerConfig.HubInstanceTraceFile,
+ .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
+ .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .WatchDog =
+ {
+ .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
+ .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs),
+ .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs),
+ .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds),
+ .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds),
+ .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds),
+ .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
+ .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
+ },
+ .ResourceLimits = ResolveLimits(ServerConfig),
+ .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(),
+ .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()};
+
+ if (!ServerConfig.HydrationTargetConfigPath.empty())
+ {
+ FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath);
+ if (!Contents)
+ {
+ throw zen::runtime_error("Failed to read hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ Contents.ErrorCode.message());
+ }
+ IoBuffer Buffer(Contents.Flatten());
+ std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize());
+
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError);
+ if (!ParseError.empty() || !Root.IsObject())
+ {
+ throw zen::runtime_error("Failed to parse hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ ParseError.empty() ? "root must be a JSON object" : ParseError);
+ }
+ HubConfig.HydrationOptions = std::move(Root).AsObject();
+ }
+
+ m_Proxy = std::make_unique<HttpProxyHandler>();
+
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{
- .UseJobObject = ServerConfig.HubUseJobObject,
- .BasePortNumber = ServerConfig.HubBasePortNumber,
- .InstanceLimit = ServerConfig.HubInstanceLimit,
- .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
- .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
- .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
- .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
- .WatchDog =
- {
- .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
- .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs),
- .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs),
- .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds),
- .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds),
- .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds),
- .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
- .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
- }},
+ std::move(HubConfig),
ZenServerEnvironment(ZenServerEnvironment::Hub,
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
ServerConfig.HubInstanceHttpClass),
- &GetMediumWorkerPool(EWorkloadType::Background),
- m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
- std::string_view ModuleId,
- const HubProvisionedInstanceInfo& Info,
- HubInstanceState PreviousState,
- HubInstanceState NewState) {
- OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState);
- }}
- : Hub::AsyncModuleStateChangeCallbackFunc{});
+ Hub::AsyncModuleStateChangeCallbackFunc{
+ [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState) {
+ OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState);
+ }});
+
+ m_Proxy->SetPortValidator([Hub = m_Hub.get()](uint16_t Port) { return Hub->IsInstancePort(Port); });
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
ZEN_INFO("instantiating hub service");
- m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService);
+ m_HubService = std::make_unique<HttpHubService>(*m_Hub, *m_Proxy, m_StatsService, m_StatusService);
m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
@@ -465,21 +771,32 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi
}
else
{
- ZEN_INFO("Consul token read from environment variable '{}'", ConsulAccessTokenEnvName);
+ ZEN_INFO("Consul token will be read from environment variable '{}'", ConsulAccessTokenEnvName);
}
try
{
- m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint, ConsulAccessToken);
+ m_ConsulClient = std::make_unique<consul::ConsulClient>(consul::ConsulClient::Configuration{
+ .BaseUri = ServerConfig.ConsulEndpoint,
+ .TokenEnvName = ConsulAccessTokenEnvName,
+ });
m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds;
m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds;
+ if (!ServerConfig.ConsulRegisterHub)
+ {
+ ZEN_INFO(
+ "Hub parent Consul registration skipped (consul-register-hub is false); "
+ "instance registration remains enabled");
+ return;
+ }
+
consul::ServiceRegistrationInfo Info;
Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId);
Info.ServiceName = "zen-hub";
// Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri?
Info.Port = static_cast<uint16_t>(EffectivePort);
- Info.HealthEndpoint = "hub/health";
+ Info.HealthEndpoint = "health";
Info.Tags = std::vector<std::pair<std::string, std::string>>{
std::make_pair("zen-hub", Info.ServiceId),
std::make_pair("version", std::string(ZEN_CFG_VERSION)),
@@ -569,6 +886,8 @@ ZenHubServer::Run()
OnReady();
+ StartSelfSession("zenhub");
+
m_Http->Run(IsInteractiveMode);
SetNewState(kShuttingDown);
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 77df3eaa3..5e465bb14 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -3,8 +3,10 @@
#pragma once
#include "hubinstancestate.h"
+#include "resourcemetrics.h"
#include "zenserver.h"
+#include <zencore/workthreadpool.h>
#include <zenutil/consul.h>
namespace cxxopts {
@@ -19,6 +21,7 @@ namespace zen {
class HttpApiService;
class HttpFrontendService;
class HttpHubService;
+class HttpProxyHandler;
struct ZenHubWatchdogConfig
{
@@ -34,21 +37,33 @@ struct ZenHubWatchdogConfig
struct ZenHubServerConfig : public ZenServerConfig
{
- std::string UpstreamNotificationEndpoint;
- std::string InstanceId; // For use in notifications
- std::string ConsulEndpoint; // If set, enables Consul service registration
- std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty
- uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks
- uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service
- uint16_t HubBasePortNumber = 21000;
- int HubInstanceLimit = 1000;
- bool HubUseJobObject = true;
- std::string HubInstanceHttpClass = "asio";
- uint32_t HubInstanceHttpThreadCount = 0; // Automatic
- int HubInstanceCoreLimit = 0; // Automatic
- std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
- std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ std::string UpstreamNotificationEndpoint;
+ std::string InstanceId; // For use in notifications
+ std::string ConsulEndpoint; // If set, enables Consul service registration
+ std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty
+ uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks
+ uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service
+ bool ConsulRegisterHub = true; // Whether to register the hub parent service with Consul (instance registration unaffected)
+ uint16_t HubBasePortNumber = 21000;
+ int HubInstanceLimit = 1000;
+ bool HubUseJobObject = true;
+ std::string HubInstanceHttpClass = "asio";
+ std::string HubInstanceMalloc;
+ std::string HubInstanceTrace;
+ std::string HubInstanceTraceHost;
+ std::string HubInstanceTraceFile;
+ uint32_t HubInstanceHttpThreadCount = 0; // Automatic
+ uint32_t HubInstanceProvisionThreadCount = 0; // Synchronous provisioning
+ uint32_t HubHydrationThreadCount = 0; // Synchronous hydration/dehydration
+ int HubInstanceCoreLimit = 0; // Automatic
+ std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
+ std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification)
ZenHubWatchdogConfig WatchdogConfig;
+ uint64_t HubProvisionDiskLimitBytes = 0;
+ uint32_t HubProvisionDiskLimitPercent = 0;
+ uint64_t HubProvisionMemoryLimitBytes = 0;
+ uint32_t HubProvisionMemoryLimitPercent = 0;
};
class Hub;
@@ -115,7 +130,10 @@ private:
std::filesystem::path m_ContentRoot;
bool m_DebugOptionForcedCrash = false;
- std::unique_ptr<Hub> m_Hub;
+ std::unique_ptr<HttpProxyHandler> m_Proxy;
+ std::unique_ptr<WorkerThreadPool> m_ProvisionWorkerPool;
+ std::unique_ptr<WorkerThreadPool> m_HydrationWorkerPool;
+ std::unique_ptr<Hub> m_Hub;
std::unique_ptr<HttpHubService> m_HubService;
std::unique_ptr<HttpApiService> m_ApiService;
@@ -126,6 +144,8 @@ private:
uint32_t m_ConsulHealthIntervalSeconds = 10;
uint32_t m_ConsulDeregisterAfterSeconds = 30;
+ static ResourceMetrics ResolveLimits(const ZenHubServerConfig& ServerConfig);
+
void InitializeState(const ZenHubServerConfig& ServerConfig);
void InitializeServices(const ZenHubServerConfig& ServerConfig);
void RegisterServices(const ZenHubServerConfig& ServerConfig);