aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-27 11:13:02 +0100
committerGitHub Enterprise <[email protected]>2026-03-27 11:13:02 +0100
commit776d76d299748a79b9cb25593cd8266cb26a6553 (patch)
treeb827b4d3f5a497d4ba851991db9fbe4b44860405 /src
parentupdate Oodle 2.9.14 -> 2.9.15 (#893) (diff)
downloadzen-776d76d299748a79b9cb25593cd8266cb26a6553.tar.xz
zen-776d76d299748a79b9cb25593cd8266cb26a6553.zip
idle deprovision in hub (#895)
- Feature: Hub watchdog automatically deprovisions inactive provisioned and hibernated instances - Feature: Added `stats/activity_counters` endpoint to measure server activity - Feature: Added configuration options for hub watchdog - `--hub-watchdog-provisioned-inactivity-timeout-seconds` Inactivity timeout before a provisioned instance is deprovisioned - `--hub-watchdog-hibernated-inactivity-timeout-seconds` Inactivity timeout before a hibernated instance is deprovisioned - `--hub-watchdog-inactivity-check-margin-seconds` Margin before timeout at which an activity check is issued - `--hub-watchdog-cycle-interval-ms` Watchdog poll interval in milliseconds - `--hub-watchdog-cycle-processing-budget-ms` Maximum time budget per watchdog cycle in milliseconds - `--hub-watchdog-instance-check-throttle-ms` Minimum delay between checks on a single instance - `--hub-watchdog-activity-check-connect-timeout-ms` Connect timeout for activity check requests - `--hub-watchdog-activity-check-request-timeout-ms` Request timeout for activity check requests
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/clients/httpclientcommon.h1
-rw-r--r--src/zenhttp/httpclient.cpp72
-rw-r--r--src/zenhttp/httpserver.cpp97
-rw-r--r--src/zenhttp/include/zenhttp/auth/authservice.h4
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h1
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h8
-rw-r--r--src/zenhttp/include/zenhttp/httpstats.h1
-rw-r--r--src/zenhttp/monitoring/httpstats.cpp154
-rw-r--r--src/zenserver/compute/computeserver.cpp2
-rw-r--r--src/zenserver/frontend/frontend.cpp24
-rw-r--r--src/zenserver/frontend/frontend.h20
-rw-r--r--src/zenserver/hub/httphubservice.cpp46
-rw-r--r--src/zenserver/hub/httphubservice.h19
-rw-r--r--src/zenserver/hub/hub.cpp348
-rw-r--r--src/zenserver/hub/hub.h29
-rw-r--r--src/zenserver/hub/zenhubserver.cpp88
-rw-r--r--src/zenserver/hub/zenhubserver.h13
-rw-r--r--src/zenserver/proxy/httpproxystats.cpp12
-rw-r--r--src/zenserver/proxy/zenproxyserver.cpp2
-rw-r--r--src/zenserver/sessions/httpsessions.cpp30
-rw-r--r--src/zenserver/sessions/httpsessions.h5
-rw-r--r--src/zenserver/storage/admin/admin.h4
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.cpp40
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.h9
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.cpp224
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.h5
-rw-r--r--src/zenserver/storage/objectstore/objectstore.cpp75
-rw-r--r--src/zenserver/storage/objectstore/objectstore.h35
-rw-r--r--src/zenserver/storage/projectstore/httpprojectstore.cpp30
-rw-r--r--src/zenserver/storage/projectstore/httpprojectstore.h5
-rw-r--r--src/zenserver/storage/upstream/upstreamservice.h4
-rw-r--r--src/zenserver/storage/workspaces/httpworkspaces.cpp30
-rw-r--r--src/zenserver/storage/workspaces/httpworkspaces.h5
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp4
34 files changed, 1093 insertions, 353 deletions
diff --git a/src/zenhttp/clients/httpclientcommon.h b/src/zenhttp/clients/httpclientcommon.h
index c30edab33..078d4a52f 100644
--- a/src/zenhttp/clients/httpclientcommon.h
+++ b/src/zenhttp/clients/httpclientcommon.h
@@ -62,6 +62,7 @@ public:
LoggerRef Log() { return m_Log; }
std::string_view GetBaseUri() const { return m_BaseUri; }
+ void SetBaseUri(std::string_view NewBaseUri) { m_BaseUri = NewBaseUri; }
std::string_view GetSessionId() const { return m_SessionId; }
bool Authenticate();
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 4000ea8a8..96107883e 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -402,6 +402,13 @@ HttpClient::~HttpClient()
}
void
+HttpClient::SetBaseUri(std::string_view NewBaseUri)
+{
+ m_BaseUri = NewBaseUri;
+ m_Inner->SetBaseUri(NewBaseUri);
+}
+
+void
HttpClient::SetSessionId(const Oid& SessionId)
{
if (SessionId == Oid::Zero)
@@ -980,6 +987,71 @@ TEST_CASE("httpclient.password")
AsioServer->RequestExit();
}
}
+TEST_CASE("httpclient.setbaseuri")
+{
+ struct TestHttpService : public HttpService
+ {
+ explicit TestHttpService(std::string_view Identity) : m_Identity(Identity) {}
+
+ virtual const char* BaseUri() const override { return "/test/"; }
+ virtual void HandleRequest(HttpServerRequest& Req) override
+ {
+ Req.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, m_Identity);
+ }
+
+ std::string m_Identity;
+ };
+
+ ScopedTemporaryDirectory TmpDir1;
+ ScopedTemporaryDirectory TmpDir2;
+ TestHttpService Service1("server-one");
+ TestHttpService Service2("server-two");
+
+ Ref<HttpServer> Server1 = CreateHttpAsioServer(AsioConfig{});
+ Ref<HttpServer> Server2 = CreateHttpAsioServer(AsioConfig{});
+
+ int Port1 = Server1->Initialize(0, TmpDir1.Path());
+ int Port2 = Server2->Initialize(0, TmpDir2.Path());
+ REQUIRE(Port1 != -1);
+ REQUIRE(Port2 != -1);
+
+ Server1->RegisterService(Service1);
+ Server2->RegisterService(Service2);
+
+ std::thread Thread1([&]() { Server1->Run(false); });
+ std::thread Thread2([&]() { Server2->Run(false); });
+
+ auto _ = MakeGuard([&]() {
+ if (Thread1.joinable())
+ {
+ Thread1.join();
+ }
+ if (Thread2.joinable())
+ {
+ Thread2.join();
+ }
+ Server1->Close();
+ Server2->Close();
+ });
+
+ HttpClient Client(fmt::format("127.0.0.1:{}", Port1), HttpClientSettings{}, {});
+ CHECK_EQ(Client.GetBaseUri(), fmt::format("127.0.0.1:{}", Port1));
+
+ HttpClient::Response Resp1 = Client.Get("/test/hello");
+ CHECK(Resp1.IsSuccess());
+ CHECK_EQ(Resp1.AsText(), "server-one");
+
+ Client.SetBaseUri(fmt::format("127.0.0.1:{}", Port2));
+ CHECK_EQ(Client.GetBaseUri(), fmt::format("127.0.0.1:{}", Port2));
+
+ HttpClient::Response Resp2 = Client.Get("/test/hello");
+ CHECK(Resp2.IsSuccess());
+ CHECK_EQ(Resp2.AsText(), "server-two");
+
+ Server1->RequestExit();
+ Server2->RequestExit();
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index a46c5b851..9fc42f18c 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -798,7 +798,18 @@ HttpRequestRouter::HandleRequest(zen::HttpServerRequest& Request)
const HttpVerb Verb = Request.RequestVerb();
- std::string_view Uri = Request.RelativeUri();
+ std::string_view Uri = Request.RelativeUri();
+
+ // Strip the separator slash left over after the service prefix is removed.
+ // When a service has BaseUri "/foo", the prefix length is set to len("/foo") = 4.
+ // Stripping 4 chars from "/foo/bar" yields "/bar" — the path separator becomes
+ // the first character of the relative URI. Remove it so patterns like "bar" or
+ // "{id}" match without needing to account for the leading slash.
+ if (!Uri.empty() && Uri.front() == '/')
+ {
+ Uri.remove_prefix(1);
+ }
+
HttpRouterRequest RouterRequest(Request);
for (const MatcherEndpoint& Handler : m_MatcherEndpoints)
@@ -974,6 +985,12 @@ HttpServer::SetHttpRequestFilter(IHttpRequestFilter* RequestFilter)
OnSetHttpRequestFilter(RequestFilter);
}
+void
+HttpServer::HandleStatsRequest(HttpServerRequest& Request)
+{
+ Request.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpServer::CollectStats()
{
@@ -1004,12 +1021,6 @@ HttpServer::CollectStats()
return Cbo.Save();
}
-void
-HttpServer::HandleStatsRequest(HttpServerRequest& Request)
-{
- Request.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
//////////////////////////////////////////////////////////////////////////
HttpRpcHandler::HttpRpcHandler()
@@ -1446,6 +1457,78 @@ TEST_CASE("http.common")
}
}
+ SUBCASE("router-leading-slash")
+ {
+ // Verify that HandleRequest strips the leading slash that server implementations
+ // leave in RelativeUri() when the service base URI has no trailing slash.
+ // e.g. BaseUri "/stats" + prefix-strip of "/stats/foo" yields "/foo", not "foo".
+
+ bool HandledLiteral = false;
+ bool HandledPattern = false;
+ bool HandledTwoSeg = false;
+ std::vector<std::string> Captures;
+ auto Reset = [&] {
+ HandledLiteral = HandledPattern = HandledTwoSeg = false;
+ Captures.clear();
+ };
+
+ TestHttpService Service;
+ HttpRequestRouter r;
+
+ r.AddMatcher("seg", [](std::string_view In) -> bool { return !In.empty() && In.find('/') == std::string_view::npos; });
+
+ r.RegisterRoute(
+ "activity_counters",
+ [&](auto& /*Req*/) { HandledLiteral = true; },
+ HttpVerb::kGet);
+
+ r.RegisterRoute(
+ "{seg}",
+ [&](auto& Req) {
+ HandledPattern = true;
+ Captures = {std::string(Req.GetCapture(1))};
+ },
+ HttpVerb::kGet);
+
+ r.RegisterRoute(
+ "prefix/{seg}",
+ [&](auto& Req) {
+ HandledTwoSeg = true;
+ Captures = {std::string(Req.GetCapture(1))};
+ },
+ HttpVerb::kGet);
+
+ // Single-segment literal with leading slash — simulates real server RelativeUri
+ {
+ Reset();
+ TestHttpServerRequest req{Service, "/activity_counters"sv};
+ r.HandleRequest(req);
+ CHECK(HandledLiteral);
+ CHECK(!HandledPattern);
+ }
+
+ // Single-segment pattern with leading slash
+ {
+ Reset();
+ TestHttpServerRequest req{Service, "/hello"sv};
+ r.HandleRequest(req);
+ CHECK(!HandledLiteral);
+ CHECK(HandledPattern);
+ REQUIRE_EQ(Captures.size(), 1);
+ CHECK_EQ(Captures[0], "hello"sv);
+ }
+
+ // Two-segment route with leading slash — first literal segment
+ {
+ Reset();
+ TestHttpServerRequest req{Service, "/prefix/world"sv};
+ r.HandleRequest(req);
+ CHECK(HandledTwoSeg);
+ REQUIRE_EQ(Captures.size(), 1);
+ CHECK_EQ(Captures[0], "world"sv);
+ }
+ }
+
SUBCASE("content-type")
{
for (uint8_t i = 0; i < uint8_t(HttpContentType::kCOUNT); ++i)
diff --git a/src/zenhttp/include/zenhttp/auth/authservice.h b/src/zenhttp/include/zenhttp/auth/authservice.h
index 64b86e21f..ee67c0f5b 100644
--- a/src/zenhttp/include/zenhttp/auth/authservice.h
+++ b/src/zenhttp/include/zenhttp/auth/authservice.h
@@ -8,14 +8,14 @@ namespace zen {
class AuthMgr;
-class HttpAuthService final : public zen::HttpService
+class HttpAuthService final : public HttpService
{
public:
HttpAuthService(AuthMgr& AuthMgr);
virtual ~HttpAuthService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
private:
AuthMgr& m_AuthMgr;
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index b0d74951e..26d60b9ae 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -364,6 +364,7 @@ public:
LoggerRef Log() { return m_Log; }
std::string_view GetBaseUri() const { return m_BaseUri; }
std::string_view GetSessionId() const { return m_SessionId; }
+ void SetBaseUri(std::string_view NewBaseUri);
void SetSessionId(const Oid& SessionId);
bool Authenticate();
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index 633eb06be..5eaed6004 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -220,6 +220,12 @@ struct IHttpStatsProvider
* not override this will be skipped in WebSocket broadcasts.
*/
virtual CbObject CollectStats() { return {}; }
+
+ /** Return a number indicating activity. Increase the number
+ * when activity is detected. Example would be to return the
+ * number of received requests
+ */
+ virtual uint64_t GetActivityCounter() { return 0; }
};
struct IHttpStatsService
@@ -302,8 +308,8 @@ public:
}
// IHttpStatsProvider
- virtual CbObject CollectStats() override;
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
private:
std::vector<HttpService*> m_KnownServices;
diff --git a/src/zenhttp/include/zenhttp/httpstats.h b/src/zenhttp/include/zenhttp/httpstats.h
index 460315faf..bce771c75 100644
--- a/src/zenhttp/include/zenhttp/httpstats.h
+++ b/src/zenhttp/include/zenhttp/httpstats.h
@@ -62,6 +62,7 @@ private:
std::atomic<bool> m_PushEnabled{false};
void BroadcastStats();
+ void Initialize();
// Thread-based push (when no io_context is provided)
std::thread m_PushThread;
diff --git a/src/zenhttp/monitoring/httpstats.cpp b/src/zenhttp/monitoring/httpstats.cpp
index 283cedca7..7e6207e56 100644
--- a/src/zenhttp/monitoring/httpstats.cpp
+++ b/src/zenhttp/monitoring/httpstats.cpp
@@ -16,6 +16,7 @@ HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("
m_PushEnabled.store(true);
m_PushThread = std::thread([this] { PushThreadFunction(); });
}
+ Initialize();
}
HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats"))
@@ -26,6 +27,110 @@ HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSo
m_PushTimer = std::make_unique<asio::steady_timer>(IoContext);
EnqueuePushTimer();
}
+ Initialize();
+}
+
+void
+HttpStatsService::Initialize()
+{
+ m_Router.AddMatcher("handler_id", [](std::string_view Str) -> bool {
+ if (Str.empty())
+ {
+ return false;
+ }
+ for (const auto C : Str)
+ {
+ if (std::isalnum(C) || C == '$')
+ {
+ // fine
+ }
+ else
+ {
+ // not fine
+ return false;
+ }
+ }
+ return true;
+ });
+
+ m_Router.RegisterRoute(
+ "activity_counters",
+ [this](HttpRouterRequest& Request) {
+ CbObjectWriter Obj;
+
+ std::uint64_t SumActivity = 0;
+
+ std::vector<std::pair<std::string, uint64_t>> Activities;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Activities.reserve(m_Providers.size());
+ for (const auto& It : m_Providers)
+ {
+ const std::string& HandlerName = It.first;
+ IHttpStatsProvider* Provider = It.second;
+ ZEN_ASSERT(Provider != nullptr);
+ uint64_t ProviderActivityCounter = Provider->GetActivityCounter();
+ if (ProviderActivityCounter != 0)
+ {
+ Activities.push_back(std::make_pair(HandlerName, ProviderActivityCounter));
+ }
+ SumActivity += ProviderActivityCounter;
+ }
+ }
+
+ Obj.BeginArray("providers");
+ for (const std::pair<std::string, uint64_t>& Activity : Activities)
+ {
+ const std::string& HandlerName = Activity.first;
+ uint64_t ProviderActivityCounter = Activity.second;
+ Obj.BeginObject();
+ {
+ Obj.AddString("provider", HandlerName);
+ Obj.AddInteger("activity_counter", ProviderActivityCounter);
+ }
+ Obj.EndObject();
+ }
+ Obj.EndArray();
+
+ Obj.AddInteger("sum", SumActivity);
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{handler_id}",
+ [this](HttpRouterRequest& Request) {
+ std::string_view Handler = Request.GetCapture(1);
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Providers.find(std::string{Handler}); It != end(m_Providers))
+ {
+ return It->second->HandleStatsRequest(Request.ServerRequest());
+ }
+ Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ },
+ HttpVerb::kHead | HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "",
+ [this](HttpRouterRequest& Request) {
+ CbObjectWriter Cbo;
+
+ Cbo.BeginArray("providers");
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& Kv : m_Providers)
+ {
+ Cbo << Kv.first;
+ }
+ }
+
+ Cbo.EndArray();
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kHead | HttpVerb::kGet);
}
HttpStatsService::~HttpStatsService()
@@ -82,54 +187,7 @@ void
HttpStatsService::HandleRequest(HttpServerRequest& Request)
{
ZEN_TRACE_CPU("HttpStatsService::HandleRequest");
- using namespace std::literals;
-
- std::string_view Key = Request.RelativeUri();
-
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- if (Key.empty())
- {
- CbObjectWriter Cbo;
-
- Cbo.BeginArray("providers");
-
- {
- RwLock::SharedLockScope _(m_Lock);
- for (auto& Kv : m_Providers)
- {
- Cbo << Kv.first;
- }
- }
-
- Cbo.EndArray();
-
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- else if (Key[0] == '/')
- {
- Key.remove_prefix(1);
- size_t SlashPos = Key.find_first_of("/?");
- if (SlashPos != std::string::npos)
- {
- Key = Key.substr(0, SlashPos);
- }
-
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
- {
- return It->second->HandleStatsRequest(Request);
- }
- }
- }
-
- [[fallthrough]];
- default:
- return;
- }
+ m_Router.HandleRequest(Request);
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index d1875f41a..1673cea6c 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -481,7 +481,7 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
ServerConfig.DataDir / "functions",
ServerConfig.MaxConcurrentActions);
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
# if ZEN_WITH_NOMAD
// Nomad provisioner
diff --git a/src/zenserver/frontend/frontend.cpp b/src/zenserver/frontend/frontend.cpp
index 697cc014e..fa7b580e8 100644
--- a/src/zenserver/frontend/frontend.cpp
+++ b/src/zenserver/frontend/frontend.cpp
@@ -9,6 +9,7 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/trace.h>
+#include <zenhttp/httpstats.h>
ZEN_THIRD_PARTY_INCLUDES_START
#if ZEN_PLATFORM_WINDOWS
@@ -28,8 +29,9 @@ static unsigned char gHtmlZipData[] = {
namespace zen {
////////////////////////////////////////////////////////////////////////////////
-HttpFrontendService::HttpFrontendService(std::filesystem::path Directory, HttpStatusService& StatusService)
+HttpFrontendService::HttpFrontendService(std::filesystem::path Directory, HttpStatsService& StatsService, HttpStatusService& StatusService)
: m_Directory(Directory)
+, m_StatsService(StatsService)
, m_StatusService(StatusService)
{
ZEN_TRACE_CPU("HttpFrontendService::HttpFrontendService");
@@ -94,12 +96,14 @@ HttpFrontendService::HttpFrontendService(std::filesystem::path Directory, HttpSt
{
ZEN_INFO("front-end is NOT AVAILABLE");
}
+ m_StatsService.RegisterHandler("dashboard", *this);
m_StatusService.RegisterHandler("dashboard", *this);
}
HttpFrontendService::~HttpFrontendService()
{
m_StatusService.UnregisterHandler("dashboard", *this);
+ m_StatsService.UnregisterHandler("dashboard", *this);
}
const char*
@@ -122,6 +126,8 @@ HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request)
{
using namespace std::literals;
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
ExtendableStringBuilder<256> UriBuilder;
std::string_view Uri = Request.RelativeUriWithExtension();
@@ -230,4 +236,20 @@ HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request)
}
}
+void
+HttpFrontendService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+uint64_t
+HttpFrontendService::GetActivityCounter()
+{
+ return m_HttpRequests.Count();
+}
+
} // namespace zen
diff --git a/src/zenserver/frontend/frontend.h b/src/zenserver/frontend/frontend.h
index 0ae3170ad..541e6213b 100644
--- a/src/zenserver/frontend/frontend.h
+++ b/src/zenserver/frontend/frontend.h
@@ -11,20 +11,26 @@
namespace zen {
-class HttpFrontendService final : public zen::HttpService, public IHttpStatusProvider
+class HttpStatsService;
+
+class HttpFrontendService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
- HttpFrontendService(std::filesystem::path Directory, HttpStatusService& StatusService);
+ HttpFrontendService(std::filesystem::path Directory, HttpStatsService& StatsService, HttpStatusService& StatusService);
virtual ~HttpFrontendService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual uint64_t GetActivityCounter() override;
private:
- std::unique_ptr<ZipFs> m_ZipFs;
- std::filesystem::path m_Directory;
- std::filesystem::path m_DocsDirectory;
- HttpStatusService& m_StatusService;
+ std::unique_ptr<ZipFs> m_ZipFs;
+ std::filesystem::path m_Directory;
+ std::filesystem::path m_DocsDirectory;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ metrics::OperationTiming m_HttpRequests;
};
} // namespace zen
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 032a61f08..dcab50f2d 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zenhttp/httpstats.h>
namespace zen {
@@ -42,7 +43,10 @@ namespace {
}
} // namespace
-HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
+HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService)
+: m_Hub(Hub)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
{
using namespace std::literals;
@@ -234,10 +238,15 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
},
HttpVerb::kGet);
+
+ m_StatsService.RegisterHandler("hub", *this);
+ m_StatusService.RegisterHandler("hub", *this);
}
HttpHubService::~HttpHubService()
{
+ m_StatusService.UnregisterHandler("hub", *this);
+ m_StatsService.UnregisterHandler("hub", *this);
}
const char*
@@ -254,9 +263,40 @@ HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEnd
}
void
-HttpHubService::HandleRequest(zen::HttpServerRequest& Request)
+HttpHubService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv);
+ }
+}
+
+void
+HttpHubService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpHubService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+uint64_t
+HttpHubService::GetActivityCounter()
{
- m_Router.HandleRequest(Request);
+ return m_HttpRequests.Count();
}
void
diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h
index d08eeea2a..5f940017e 100644
--- a/src/zenserver/hub/httphubservice.h
+++ b/src/zenserver/hub/httphubservice.h
@@ -3,9 +3,11 @@
#pragma once
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpstatus.h>
namespace zen {
+class HttpStatsService;
class Hub;
/** ZenServer Hub Service
@@ -14,25 +16,32 @@ class Hub;
* use in UEFN content worker style scenarios.
*
*/
-class HttpHubService : public zen::HttpService
+class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
- HttpHubService(Hub& Hub);
+ HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService);
~HttpHubService();
HttpHubService(const HttpHubService&) = delete;
HttpHubService& operator=(const HttpHubService&) = delete;
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual uint64_t GetActivityCounter() override;
void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId);
private:
- HttpRequestRouter m_Router;
-
Hub& m_Hub;
+ HttpRequestRouter m_Router;
+ metrics::OperationTiming m_HttpRequests;
+
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+
void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
};
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index db406947a..6c44e2333 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -21,7 +22,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zenhttp/httpclient.h>
#endif
#include <numeric>
@@ -179,7 +179,10 @@ Hub::~Hub()
try
{
// Safety call - should normally be properly Shutdown by owner
- Shutdown();
+ if (!m_ShutdownFlag.load())
+ {
+ Shutdown();
+ }
}
catch (const std::exception& e)
{
@@ -212,10 +215,13 @@ Hub::Shutdown()
}
EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) {
- ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings...
+ ZEN_UNUSED(Info);
try
{
- const Response DepResp = InternalDeprovision(std::string(ModuleId));
+ const Response DepResp = InternalDeprovision(std::string(ModuleId), [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted)
{
ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message);
@@ -499,11 +505,14 @@ Hub::Response
Hub::Deprovision(const std::string& ModuleId)
{
ZEN_ASSERT(!m_ShutdownFlag.load());
- return InternalDeprovision(ModuleId);
+ return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
}
Hub::Response
-Hub::InternalDeprovision(const std::string& ModuleId)
+Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
@@ -521,6 +530,11 @@ Hub::InternalDeprovision(const std::string& ModuleId)
ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ if (!DeprovisionGate(m_ActiveInstances[ActiveInstanceIndex]))
+ {
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' deprovision denied by gate", ModuleId)};
+ }
+
HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
switch (CurrentState)
@@ -1051,6 +1065,8 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
}
return false;
}(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState));
+ m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0);
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState);
}
@@ -1152,14 +1168,165 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
}
}
+bool
+Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex)
+{
+ HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (LockedInstance.IsRunning())
+ {
+ LockedInstance.UpdateMetrics();
+ if (InstanceState == HubInstanceState::Provisioned)
+ {
+ const std::string ModuleId(LockedInstance.GetModuleId());
+
+ const uint16_t Port = LockedInstance.GetBasePort();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+
+ // We do the activity check without holding a lock to the instance
+ LockedInstance = {};
+
+ uint64_t ActivitySum = PreviousActivitySum;
+
+ std::chrono::system_clock::time_point NextCheckTime =
+ LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin;
+ if (Now >= NextCheckTime)
+ {
+ ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port));
+ HttpClient::Response Result =
+ ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Result.IsSuccess())
+ {
+ CbObject Response = Result.AsObject();
+ if (Response)
+ {
+ ActivitySum = Response["sum"].AsUInt64();
+ }
+ }
+ }
+
+ if (ActivitySum != PreviousActivitySum)
+ {
+ m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() {
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
+ {
+ const uint64_t ActiveInstanceIndex = It->second;
+ ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex];
+
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState == InstanceState)
+ {
+ if (Instance.LastActivityTime.load() == LastActivityTime &&
+ Instance.LastKnownActivitySum.load() == PreviousActivitySum)
+ {
+ Instance.LastActivityTime.store(Now);
+ Instance.LastKnownActivitySum.store(ActivitySum);
+ }
+ }
+ }
+ });
+ }
+ else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime ||
+ Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ }
+
+ return true;
+ }
+ else if (InstanceState == HubInstanceState::Provisioned)
+ {
+ // Process is not running but state says it should be - instance died unexpectedly.
+ const std::string ModuleId(LockedInstance.GetModuleId());
+ const uint16_t Port = LockedInstance.GetBasePort();
+ UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
+ LockedInstance = {};
+
+ return false;
+ }
+ else if (InstanceState == HubInstanceState::Hibernated)
+ {
+ // Process is not running - no HTTP activity check is possible.
+ // Use a pure time-based check; the margin window does not apply here.
+ const std::string ModuleId = std::string(LockedInstance.GetModuleId());
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ LockedInstance = {};
+
+ if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ return true;
+ }
+ else
+ {
+ // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
+ // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
+ // lock was busy on a previous cycle and recovery is already pending.
+ return true;
+ }
+}
+
void
Hub::WatchDog()
{
- constexpr uint64_t WatchDogWakeupTimeMs = 3000;
- constexpr uint64_t WatchDogProcessingTimeMs = 500;
+ const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count();
+ const uint64_t CycleProcessingBudgetMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleProcessingBudget).count();
+ const uint64_t InstanceCheckThrottleMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count();
+
+ HttpClient ActivityCheckClient("http://localhost",
+ HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout,
+ .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout},
+ [&]() -> bool { return m_WatchDogEvent.Wait(0); });
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
- while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs))
+ while (!m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
{
try
{
@@ -1169,7 +1336,7 @@ Hub::WatchDog()
Stopwatch Timer;
bool ShuttingDown = false;
- while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown)
+ while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown)
{
StorageServerInstance::SharedLockedPtr LockedInstance;
m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() {
@@ -1197,27 +1364,18 @@ Hub::WatchDog()
continue;
}
- if (LockedInstance.IsRunning())
+ std::string ModuleId(LockedInstance.GetModuleId());
+
+ bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
+ if (InstanceIsOk)
{
- LockedInstance.UpdateMetrics();
+ ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
}
- else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned)
+ else
{
- // Process is not running but state says it should be - instance died unexpectedly.
- const std::string ModuleId(LockedInstance.GetModuleId());
- const uint16_t Port = LockedInstance.GetBasePort();
- UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed);
- NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
- LockedInstance = {};
+ ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
AttemptRecoverInstance(ModuleId);
}
- // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
- // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
- // lock was busy on a previous cycle and recovery is already pending.
- LockedInstance = {};
-
- // Rate-limit: pause briefly between live-instance checks and respond to shutdown.
- ShuttingDown = m_WatchDogEvent.Wait(5);
}
}
catch (const std::exception& Ex)
@@ -1306,7 +1464,7 @@ namespace hub_testutils {
// Poll until Find() returns false for the given module (i.e. async deprovision completes).
static bool WaitForInstanceGone(Hub& HubInstance,
std::string_view ModuleId,
- std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
std::chrono::seconds Timeout = std::chrono::seconds(30))
{
const auto Deadline = std::chrono::steady_clock::now() + Timeout;
@@ -1324,7 +1482,7 @@ namespace hub_testutils {
// Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete).
static bool WaitForInstanceCount(Hub& HubInstance,
int ExpectedCount,
- std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
std::chrono::seconds Timeout = std::chrono::seconds(30))
{
const auto Deadline = std::chrono::steady_clock::now() + Timeout;
@@ -1867,7 +2025,7 @@ TEST_CASE("hub.async_hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
- constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
constexpr auto kTimeout = std::chrono::seconds(30);
// Provision and wait until Provisioned
@@ -1961,7 +2119,12 @@ TEST_CASE("hub.recover_process_crash")
CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
};
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
+ // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
+ Hub::Configuration Config;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(CaptureFunc));
HubProvisionedInstanceInfo Info;
{
@@ -1973,8 +2136,8 @@ TEST_CASE("hub.recover_process_crash")
// recovers the instance, and the new process is serving requests.
HubInstance->TerminateModuleForTesting("module_a");
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(200);
- constexpr auto kTimeoutMs = std::chrono::seconds(20);
+ constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
+ constexpr auto kTimeoutMs = std::chrono::seconds(15);
const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
// A successful HTTP health check on the same port confirms the new process is up.
@@ -2020,7 +2183,13 @@ TEST_CASE("hub.recover_process_crash")
TEST_CASE("hub.recover_process_crash_then_deprovision")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
+ Hub::Configuration Config;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
{
@@ -2031,8 +2200,8 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
// Kill the child process, wait for the watchdog to detect and recover the instance.
HubInstance->TerminateModuleForTesting("module_a");
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(200);
- constexpr auto kTimeoutMs = std::chrono::seconds(20);
+ constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
+ constexpr auto kTimeoutMs = std::chrono::seconds(15);
const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
bool Recovered = false;
@@ -2108,7 +2277,7 @@ TEST_CASE("hub.async_provision_concurrent")
}
// Poll until all instances reach Provisioned state
- constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
constexpr auto kTimeout = std::chrono::seconds(30);
const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
@@ -2209,6 +2378,113 @@ TEST_CASE("hub.async_provision_rejected")
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
}
+TEST_CASE("hub.instance.inactivity.deprovision")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ // Aggressive watchdog settings to keep test duration short.
+ // Provisioned timeout (2s) > Hibernated timeout (1s) - this is the key invariant under test.
+ // Margin (1s) means the HTTP activity check fires at LastActivityTime+1s for Provisioned instances.
+ // The Hibernated branch ignores the margin and uses a direct time-based check.
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23200;
+ Config.InstanceLimit = 3;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+ Config.WatchDog.ProvisionedInactivityTimeout = std::chrono::seconds(2);
+ Config.WatchDog.HibernatedInactivityTimeout = std::chrono::seconds(1);
+ Config.WatchDog.InactivityCheckMargin = std::chrono::seconds(1);
+ Config.WatchDog.ActivityCheckConnectTimeout = std::chrono::milliseconds(200);
+ Config.WatchDog.ActivityCheckRequestTimeout = std::chrono::milliseconds(500);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ // Provision in order: idle first, idle_hib second (then hibernate), persistent last.
+ // idle_hib uses the shorter Hibernated timeout (1s) and expires before idle (2s provisioned).
+ // persistent gets real HTTP PUTs so its activity timer is reset; it must still be alive
+ // after both idle instances are gone.
+
+ HubProvisionedInstanceInfo IdleInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle", IdleInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ HubProvisionedInstanceInfo IdleHibInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle_hib", IdleHibInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ const Hub::Response H = HubInstance->Hibernate("idle_hib");
+ REQUIRE_MESSAGE(H.ResponseCode == Hub::EResponseCode::Completed, H.Message);
+ }
+
+ HubProvisionedInstanceInfo PersistentInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("persistent", PersistentInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ auto PokeInstance = [&](uint16_t Port) {
+ // Make a real storage request to increment the instance's activity sum.
+ // The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
+ {
+ HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
+ .count();
+ IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
+ const HttpClient::Response PutResult =
+ PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
+ CHECK(PutResult);
+ }
+ };
+
+ PokeInstance(IdleInfo.Port);
+ PokeInstance(PersistentInfo.Port);
+
+ Sleep(100);
+
+ // Phase 1: immediately after setup all three instances must still be alive.
+ // No timeout has elapsed yet (only 100ms have passed).
+ CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+
+ // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout.
+ // idle must remain alive - its 2s provisioned timeout has not elapsed yet.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle_hib", std::chrono::milliseconds(100), std::chrono::seconds(3)),
+ "idle_hib was not deprovisioned within its 1s hibernated timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should be gone after its 1s hibernated timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle"),
+ "idle was deprovisioned before its 2s provisioned timeout - only idle_hib's 1s hibernated timeout has elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ PokeInstance(PersistentInfo.Port);
+
+ // Phase 3: idle must be deprovisioned by the watchdog within its 2s provisioned timeout.
+ // persistent must remain alive - its activity timer was reset by PokeInstance.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle", std::chrono::milliseconds(100), std::chrono::seconds(4)),
+ "idle was not deprovisioned within its 2s provisioned timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ HubInstance->Shutdown();
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index eb2a06587..c343b19e2 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -9,16 +9,17 @@
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
+#include <chrono>
#include <deque>
#include <filesystem>
#include <functional>
#include <memory>
#include <thread>
#include <unordered_map>
-#include <unordered_set>
namespace zen {
+class HttpClient;
class WorkerThreadPool;
/**
@@ -36,6 +37,19 @@ struct HubProvisionedInstanceInfo
class Hub
{
public:
+ struct WatchDogConfiguration
+ {
+ std::chrono::milliseconds CycleInterval = std::chrono::seconds(3);
+ std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500);
+ std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5);
+ std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10);
+ std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30);
+ std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1);
+
+ std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100);
+ std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200);
+ };
+
struct Configuration
{
/** Enable or disable the use of a Windows Job Object for child process management.
@@ -52,6 +66,8 @@ public:
int InstanceCoreLimit = 0; // Automatic
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
+
+ WatchDogConfiguration WatchDog;
};
typedef std::function<
@@ -177,11 +193,15 @@ private:
std::unique_ptr<StorageServerInstance> Instance;
std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
// TODO: We should move current metrics here (from StorageServerInstance)
+
+ // Read and updated by WatchDog, updates to State triggers a reset of both
+ std::atomic<uint64_t> LastKnownActivitySum = 0;
+ std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min();
};
// UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive
// lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State.
- // State mutation and notification (NotifyStateUpdate) are intentionally decoupled — see NotifyStateUpdate below.
+ // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below.
HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance,
size_t ActiveInstanceIndex,
@@ -213,6 +233,9 @@ private:
Event m_WatchDogEvent;
void WatchDog();
+ bool CheckInstanceStatus(HttpClient& ActivityHttpClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex);
void AttemptRecoverInstance(std::string_view ModuleId);
void UpdateStats();
@@ -220,7 +243,7 @@ private:
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const;
- Response InternalDeprovision(const std::string& ModuleId);
+ Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate);
void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
size_t ActiveInstanceIndex,
HubInstanceState OldState,
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 269de28c2..314031246 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -147,6 +147,62 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"),
"");
#endif // ZEN_PLATFORM_WINDOWS
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-interval-ms",
+ "Interval between watchdog cycles in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-processing-budget-ms",
+ "Maximum processing time budget per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs)->default_value("500"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-instance-check-throttle-ms",
+ "Delay between checking successive instances per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs)->default_value("5"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-provisioned-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a provisioned instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds)->default_value("600"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-hibernated-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a hibernated instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds)->default_value("1800"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-inactivity-check-margin-seconds",
+ "Margin in seconds subtracted from inactivity timeout before triggering an activity check",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds)->default_value("60"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-connect-timeout-ms",
+ "Connect timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs)->default_value("100"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-request-timeout-ms",
+ "Request timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"),
+ "<ms>");
}
void
@@ -320,17 +376,27 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig)
void
ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
{
- ZEN_UNUSED(ServerConfig);
-
ZEN_INFO("instantiating Hub");
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject,
- .BasePortNumber = ServerConfig.HubBasePortNumber,
- .InstanceLimit = ServerConfig.HubInstanceLimit,
- .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
- .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
- .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
- .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification},
+ Hub::Configuration{
+ .UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit,
+ .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
+ .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
+ .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
+ .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .WatchDog =
+ {
+ .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
+ .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs),
+ .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs),
+ .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds),
+ .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds),
+ .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds),
+ .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
+ .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
+ }},
ZenServerEnvironment(ZenServerEnvironment::Hub,
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
@@ -349,10 +415,10 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
ZEN_INFO("instantiating hub service");
- m_HubService = std::make_unique<HttpHubService>(*m_Hub);
+ m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService);
m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
}
void
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index c4a45a8fe..77df3eaa3 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -20,6 +20,18 @@ class HttpApiService;
class HttpFrontendService;
class HttpHubService;
+struct ZenHubWatchdogConfig
+{
+ uint32_t CycleIntervalMs = 3000;
+ uint32_t CycleProcessingBudgetMs = 500;
+ uint32_t InstanceCheckThrottleMs = 5;
+ uint32_t ProvisionedInactivityTimeoutSeconds = 600;
+ uint32_t HibernatedInactivityTimeoutSeconds = 1800;
+ uint32_t InactivityCheckMarginSeconds = 60; // Activity check is triggered this far before the inactivity timeout
+ uint32_t ActivityCheckConnectTimeoutMs = 100;
+ uint32_t ActivityCheckRequestTimeoutMs = 200;
+};
+
struct ZenHubServerConfig : public ZenServerConfig
{
std::string UpstreamNotificationEndpoint;
@@ -36,6 +48,7 @@ struct ZenHubServerConfig : public ZenServerConfig
int HubInstanceCoreLimit = 0; // Automatic
std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ ZenHubWatchdogConfig WatchdogConfig;
};
class Hub;
diff --git a/src/zenserver/proxy/httpproxystats.cpp b/src/zenserver/proxy/httpproxystats.cpp
index 6aa3e5c9b..337be2417 100644
--- a/src/zenserver/proxy/httpproxystats.cpp
+++ b/src/zenserver/proxy/httpproxystats.cpp
@@ -140,6 +140,12 @@ HttpProxyStatsService::HandleRecordStatus(HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+void
+HttpProxyStatsService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ Request.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpProxyStatsService::CollectStats()
{
@@ -225,10 +231,4 @@ HttpProxyStatsService::CollectStats()
return Cbo.Save();
}
-void
-HttpProxyStatsService::HandleStatsRequest(HttpServerRequest& Request)
-{
- Request.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
} // namespace zen
diff --git a/src/zenserver/proxy/zenproxyserver.cpp b/src/zenserver/proxy/zenproxyserver.cpp
index cf84c159a..7e59a7b7e 100644
--- a/src/zenserver/proxy/zenproxyserver.cpp
+++ b/src/zenserver/proxy/zenproxyserver.cpp
@@ -324,7 +324,7 @@ ZenProxyServer::Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerSt
m_ApiService = std::make_unique<HttpApiService>(*m_Http);
m_Http->RegisterService(*m_ApiService);
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
m_Http->RegisterService(*m_FrontendService);
std::string DefaultRecordDir = (m_DataRoot / "recordings").string();
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 429ba98cf..fdf2e1f21 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -49,6 +49,21 @@ HttpSessionsService::HandleRequest(HttpServerRequest& Request)
}
}
+void
+HttpSessionsService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpSessionsService::Status");
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpSessionsService::HandleStatsRequest(HttpServerRequest& HttpReq)
+{
+ HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpSessionsService::CollectStats()
{
@@ -72,19 +87,10 @@ HttpSessionsService::CollectStats()
return Cbo.Save();
}
-void
-HttpSessionsService::HandleStatsRequest(HttpServerRequest& HttpReq)
+uint64_t
+HttpSessionsService::GetActivityCounter()
{
- HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
-void
-HttpSessionsService::HandleStatusRequest(HttpServerRequest& Request)
-{
- ZEN_TRACE_CPU("HttpSessionsService::Status");
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ return m_HttpRequests.Count();
}
void
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h
index a5783a46b..86a23f835 100644
--- a/src/zenserver/sessions/httpsessions.h
+++ b/src/zenserver/sessions/httpsessions.h
@@ -29,9 +29,10 @@ public:
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
- virtual CbObject CollectStats() override;
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
+ virtual uint64_t GetActivityCounter() override;
void SetSelfSessionId(const Oid& Id) { m_SelfSessionId = Id; }
diff --git a/src/zenserver/storage/admin/admin.h b/src/zenserver/storage/admin/admin.h
index ee3da4579..361153e42 100644
--- a/src/zenserver/storage/admin/admin.h
+++ b/src/zenserver/storage/admin/admin.h
@@ -13,7 +13,7 @@ class JobQueue;
class ZenCacheStore;
struct ZenServerConfig;
-class HttpAdminService : public zen::HttpService
+class HttpAdminService : public HttpService
{
public:
struct LogPaths
@@ -31,7 +31,7 @@ public:
~HttpAdminService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
private:
HttpRequestRouter m_Router;
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp
index de9589078..bbbb0c37b 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp
@@ -605,6 +605,26 @@ HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req)
return ServerRequest.WriteResponse(HttpResponseCode::OK, ResponseObject);
}
+void
+HttpBuildStoreService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::Status");
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo.BeginObject("capabilities");
+ {
+ Cbo << "maxrangecountperrequest" << MaxRangeCountPerRequestSupported;
+ }
+ Cbo.EndObject(); // capabilities
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ Request.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpBuildStoreService::CollectStats()
{
@@ -663,24 +683,10 @@ HttpBuildStoreService::CollectStats()
return Cbo.Save();
}
-void
-HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request)
-{
- Request.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
-void
-HttpBuildStoreService::HandleStatusRequest(HttpServerRequest& Request)
+uint64_t
+HttpBuildStoreService::GetActivityCounter()
{
- ZEN_TRACE_CPU("HttpBuildStoreService::Status");
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Cbo.BeginObject("capabilities");
- {
- Cbo << "maxrangecountperrequest" << MaxRangeCountPerRequestSupported;
- }
- Cbo.EndObject(); // capabilities
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ return m_HttpRequests.Count();
}
} // namespace zen
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.h b/src/zenserver/storage/buildstore/httpbuildstore.h
index 2a09b71cf..864d12edc 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.h
+++ b/src/zenserver/storage/buildstore/httpbuildstore.h
@@ -13,18 +13,19 @@ namespace zen {
class BuildStore;
-class HttpBuildStoreService final : public zen::HttpService, public IHttpStatusProvider, public IHttpStatsProvider
+class HttpBuildStoreService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
HttpBuildStoreService(HttpStatusService& StatusService, HttpStatsService& StatsService, BuildStore& Store);
virtual ~HttpBuildStoreService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
- virtual CbObject CollectStats() override;
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
+ virtual uint64_t GetActivityCounter() override;
private:
struct BuildStoreStats
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp
index bbdb03ba4..81e244aff 100644
--- a/src/zenserver/storage/cache/httpstructuredcache.cpp
+++ b/src/zenserver/storage/cache/httpstructuredcache.cpp
@@ -1827,113 +1827,12 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::st
}
}
-CbObject
-HttpStructuredCacheService::CollectStats()
+void
+HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
{
- ZEN_MEMSCOPE(GetCacheHttpTag());
-
CbObjectWriter Cbo;
-
- EmitSnapshot("requests", m_HttpRequests, Cbo);
-
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t WriteCount = m_CacheStats.WriteCount;
- const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
- struct CidStoreStats StoreStats = m_CidStore.Stats();
- const uint64_t ChunkHitCount = StoreStats.HitCount;
- const uint64_t ChunkMissCount = StoreStats.MissCount;
- const uint64_t ChunkWriteCount = StoreStats.WriteCount;
- const uint64_t TotalCount = HitCount + MissCount;
-
- const uint64_t RpcRequests = m_CacheStats.RpcRequests;
- const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
- const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests;
- const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests;
- const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests;
- const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
- const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
-
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
-
- Cbo.BeginObject("cache");
- {
- Cbo << "badrequestcount" << BadRequestCount;
- Cbo.BeginObject("rpc");
- Cbo << "count" << RpcRequests;
- Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests;
- Cbo.BeginObject("records");
- Cbo << "count" << RpcRecordRequests;
- Cbo << "ops" << RpcRecordBatchRequests;
- Cbo.EndObject();
- Cbo.BeginObject("values");
- Cbo << "count" << RpcValueRequests;
- Cbo << "ops" << RpcValueBatchRequests;
- Cbo.EndObject();
- Cbo.BeginObject("chunks");
- Cbo << "count" << RpcChunkRequests;
- Cbo << "ops" << RpcChunkBatchRequests;
- Cbo.EndObject();
- Cbo.EndObject();
-
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << CacheSize.DiskSize;
- Cbo << "memory" << CacheSize.MemorySize;
- }
- Cbo.EndObject();
-
- Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
- Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
-
- if (m_UpstreamCache.IsActive())
- {
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
- }
-
- Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
-
- {
- ZenCacheStore::CacheStoreStats StoreStatsData = m_CacheStore.Stats();
- Cbo.BeginObject("store");
- Cbo << "hits" << StoreStatsData.HitCount << "misses" << StoreStatsData.MissCount << "writes" << StoreStatsData.WriteCount
- << "rejected_writes" << StoreStatsData.RejectedWriteCount << "rejected_reads" << StoreStatsData.RejectedReadCount;
- const uint64_t StoreTotal = StoreStatsData.HitCount + StoreStatsData.MissCount;
- Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(StoreStatsData.HitCount) / double(StoreTotal)) : 0.0);
- EmitSnapshot("read", StoreStatsData.GetOps, Cbo);
- EmitSnapshot("write", StoreStatsData.PutOps, Cbo);
- Cbo.EndObject();
- }
- }
- Cbo.EndObject();
-
- if (m_UpstreamCache.IsActive())
- {
- EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
- Cbo.BeginObject("upstream");
- {
- m_UpstreamCache.GetStatus(Cbo);
- }
- Cbo.EndObject();
- }
-
- Cbo.BeginObject("cid");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
-
- return Cbo.Save();
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
void
@@ -2156,12 +2055,119 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
-void
-HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
+CbObject
+HttpStructuredCacheService::CollectStats()
{
+ ZEN_MEMSCOPE(GetCacheHttpTag());
+
CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t WriteCount = m_CacheStats.WriteCount;
+ const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
+ struct CidStoreStats StoreStats = m_CidStore.Stats();
+ const uint64_t ChunkHitCount = StoreStats.HitCount;
+ const uint64_t ChunkMissCount = StoreStats.MissCount;
+ const uint64_t ChunkWriteCount = StoreStats.WriteCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+
+ const uint64_t RpcRequests = m_CacheStats.RpcRequests;
+ const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
+ const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests;
+ const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests;
+ const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests;
+ const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
+ const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
+
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
+ const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
+
+ Cbo.BeginObject("cache");
+ {
+ Cbo << "badrequestcount" << BadRequestCount;
+ Cbo.BeginObject("rpc");
+ Cbo << "count" << RpcRequests;
+ Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests;
+ Cbo.BeginObject("records");
+ Cbo << "count" << RpcRecordRequests;
+ Cbo << "ops" << RpcRecordBatchRequests;
+ Cbo.EndObject();
+ Cbo.BeginObject("values");
+ Cbo << "count" << RpcValueRequests;
+ Cbo << "ops" << RpcValueBatchRequests;
+ Cbo.EndObject();
+ Cbo.BeginObject("chunks");
+ Cbo << "count" << RpcChunkRequests;
+ Cbo << "ops" << RpcChunkBatchRequests;
+ Cbo.EndObject();
+ Cbo.EndObject();
+
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << CacheSize.DiskSize;
+ Cbo << "memory" << CacheSize.MemorySize;
+ }
+ Cbo.EndObject();
+
+ Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
+ Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
+
+ if (m_UpstreamCache.IsActive())
+ {
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
+ Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
+ }
+
+ Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
+
+ {
+ ZenCacheStore::CacheStoreStats StoreStatsData = m_CacheStore.Stats();
+ Cbo.BeginObject("store");
+ Cbo << "hits" << StoreStatsData.HitCount << "misses" << StoreStatsData.MissCount << "writes" << StoreStatsData.WriteCount
+ << "rejected_writes" << StoreStatsData.RejectedWriteCount << "rejected_reads" << StoreStatsData.RejectedReadCount;
+ const uint64_t StoreTotal = StoreStatsData.HitCount + StoreStatsData.MissCount;
+ Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(StoreStatsData.HitCount) / double(StoreTotal)) : 0.0);
+ EmitSnapshot("read", StoreStatsData.GetOps, Cbo);
+ EmitSnapshot("write", StoreStatsData.PutOps, Cbo);
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndObject();
+
+ if (m_UpstreamCache.IsActive())
+ {
+ EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
+ Cbo.BeginObject("upstream");
+ {
+ m_UpstreamCache.GetStatus(Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+ Cbo.BeginObject("cid");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+
+ return Cbo.Save();
+}
+
+uint64_t
+HttpStructuredCacheService::GetActivityCounter()
+{
+ return m_HttpRequests.Count();
}
bool
diff --git a/src/zenserver/storage/cache/httpstructuredcache.h b/src/zenserver/storage/cache/httpstructuredcache.h
index d462415d4..fc80b449e 100644
--- a/src/zenserver/storage/cache/httpstructuredcache.h
+++ b/src/zenserver/storage/cache/httpstructuredcache.h
@@ -105,9 +105,10 @@ private:
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
- virtual CbObject CollectStats() override;
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
+ virtual uint64_t GetActivityCounter() override;
bool AreDiskWritesAllowed() const;
diff --git a/src/zenserver/storage/objectstore/objectstore.cpp b/src/zenserver/storage/objectstore/objectstore.cpp
index 493326a32..dec0c3cee 100644
--- a/src/zenserver/storage/objectstore/objectstore.cpp
+++ b/src/zenserver/storage/objectstore/objectstore.cpp
@@ -14,6 +14,7 @@
#include "zencore/compactbinarybuilder.h"
#include "zenhttp/httpcommon.h"
#include "zenhttp/httpserver.h"
+#include "zenhttp/httpstats.h"
#include <filesystem>
#include <thread>
@@ -220,17 +221,20 @@ private:
StringBuilderBase& Builder;
};
-HttpObjectStoreService::HttpObjectStoreService(HttpStatusService& StatusService, ObjectStoreConfig Cfg)
-: m_StatusService(StatusService)
+HttpObjectStoreService::HttpObjectStoreService(HttpStatsService& StatsService, HttpStatusService& StatusService, ObjectStoreConfig Cfg)
+: m_StatsService(StatsService)
+, m_StatusService(StatusService)
, m_Cfg(std::move(Cfg))
{
Inititalize();
+ m_StatsService.RegisterHandler("obj", *this);
m_StatusService.RegisterHandler("obj", *this);
}
HttpObjectStoreService::~HttpObjectStoreService()
{
m_StatusService.UnregisterHandler("obj", *this);
+ m_StatsService.UnregisterHandler("obj", *this);
}
const char*
@@ -240,8 +244,10 @@ HttpObjectStoreService::BaseUri() const
}
void
-HttpObjectStoreService::HandleRequest(zen::HttpServerRequest& Request)
+HttpObjectStoreService::HandleRequest(HttpServerRequest& Request)
{
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
if (m_Router.HandleRequest(Request) == false)
{
ZEN_LOG_WARN(LogObj, "No route found for {0}", Request.RelativeUri());
@@ -258,6 +264,23 @@ HttpObjectStoreService::HandleStatusRequest(HttpServerRequest& Request)
}
void
+HttpObjectStoreService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+ Cbo << "total_bytes_served" << m_TotalBytesServed.load();
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+uint64_t
+HttpObjectStoreService::GetActivityCounter()
+{
+ return m_HttpRequests.Count();
+}
+
+void
HttpObjectStoreService::Inititalize()
{
ZEN_TRACE_CPU("HttpObjectStoreService::Inititalize");
@@ -281,27 +304,27 @@ HttpObjectStoreService::Inititalize()
m_Router.RegisterRoute(
"",
- [this](zen::HttpRouterRequest& Request) { ListBuckets(Request); },
+ [this](HttpRouterRequest& Request) { ListBuckets(Request); },
HttpVerb::kGet);
m_Router.RegisterRoute(
"bucket",
- [this](zen::HttpRouterRequest& Request) { ListBuckets(Request); },
+ [this](HttpRouterRequest& Request) { ListBuckets(Request); },
HttpVerb::kGet);
m_Router.RegisterRoute(
"bucket",
- [this](zen::HttpRouterRequest& Request) { CreateBucket(Request); },
+ [this](HttpRouterRequest& Request) { CreateBucket(Request); },
HttpVerb::kPost | HttpVerb::kPut);
m_Router.RegisterRoute(
"bucket",
- [this](zen::HttpRouterRequest& Request) { DeleteBucket(Request); },
+ [this](HttpRouterRequest& Request) { DeleteBucket(Request); },
HttpVerb::kDelete);
m_Router.RegisterRoute(
"bucket/{path}",
- [this](zen::HttpRouterRequest& Request) {
+ [this](HttpRouterRequest& Request) {
const std::string_view Path = Request.GetCapture(1);
const auto Sep = Path.find_last_of('.');
const bool IsObject = Sep != std::string_view::npos && Path.size() - Sep > 0;
@@ -319,7 +342,7 @@ HttpObjectStoreService::Inititalize()
m_Router.RegisterRoute(
"bucket/{bucket}/{path}",
- [this](zen::HttpRouterRequest& Request) { PutObject(Request); },
+ [this](HttpRouterRequest& Request) { PutObject(Request); },
HttpVerb::kPost | HttpVerb::kPut);
}
@@ -327,7 +350,7 @@ std::filesystem::path
HttpObjectStoreService::GetBucketDirectory(std::string_view BucketName)
{
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
if (const auto It = std::find_if(std::begin(m_Cfg.Buckets),
std::end(m_Cfg.Buckets),
@@ -342,7 +365,7 @@ HttpObjectStoreService::GetBucketDirectory(std::string_view BucketName)
}
void
-HttpObjectStoreService::ListBuckets(zen::HttpRouterRequest& Request)
+HttpObjectStoreService::ListBuckets(HttpRouterRequest& Request)
{
namespace fs = std::filesystem;
@@ -351,7 +374,7 @@ HttpObjectStoreService::ListBuckets(zen::HttpRouterRequest& Request)
CbObjectWriter Response;
Response.BeginArray("buckets");
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
// Configured buckets
for (const ObjectStoreConfig::BucketConfig& Bucket : m_Cfg.Buckets)
@@ -428,13 +451,13 @@ HttpObjectStoreService::ListBuckets(zen::HttpRouterRequest& Request)
}
Response.EndArray();
- Response << "total_bytes_served" << TotalBytesServed.load();
+ Response << "total_bytes_served" << m_TotalBytesServed.load();
return Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
}
void
-HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request)
+HttpObjectStoreService::CreateBucket(HttpRouterRequest& Request)
{
namespace fs = std::filesystem;
@@ -448,7 +471,7 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request)
const fs::path BucketPath = m_Cfg.RootDirectory / "buckets" / BucketName;
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
if (!IsDir(BucketPath))
{
CreateDirectories(BucketPath);
@@ -462,7 +485,7 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request)
}
void
-HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path)
+HttpObjectStoreService::ListBucket(HttpRouterRequest& Request, const std::string_view Path)
{
namespace fs = std::filesystem;
@@ -533,7 +556,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s
if (IsDir(FullPath))
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
Traversal.TraverseFileSystem(FullPath, FileVisitor);
}
CbObject Result = FileVisitor.GetResult();
@@ -552,7 +575,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s
}
void
-HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request)
+HttpObjectStoreService::DeleteBucket(HttpRouterRequest& Request)
{
namespace fs = std::filesystem;
@@ -566,7 +589,7 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request)
const fs::path BucketPath = m_Cfg.RootDirectory / "buckets" / BucketName;
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
DeleteDirectories(BucketPath);
}
@@ -575,7 +598,7 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request)
}
void
-HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string_view Path)
+HttpObjectStoreService::GetObject(HttpRouterRequest& Request, const std::string_view Path)
{
namespace fs = std::filesystem;
@@ -606,7 +629,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st
return Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
}
- zen::HttpRanges Ranges;
+ HttpRanges Ranges;
if (Request.ServerRequest().TryGetRanges(Ranges); Ranges.size() > 1)
{
// Only a single range is supported
@@ -615,7 +638,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st
FileContents File;
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
File = ReadFile(FilePath);
}
@@ -635,7 +658,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st
if (Ranges.empty())
{
- const uint64_t TotalServed = TotalBytesServed.fetch_add(FileBuf.Size()) + FileBuf.Size();
+ const uint64_t TotalServed = m_TotalBytesServed.fetch_add(FileBuf.Size()) + FileBuf.Size();
ZEN_LOG_DEBUG(LogObj,
"GET - '{}/{}' ({}) [OK] (Served: {})",
@@ -650,7 +673,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st
{
const auto Range = Ranges[0];
const uint64_t RangeSize = 1 + (Range.End - Range.Start);
- const uint64_t TotalServed = TotalBytesServed.fetch_add(RangeSize) + RangeSize;
+ const uint64_t TotalServed = m_TotalBytesServed.fetch_add(RangeSize) + RangeSize;
ZEN_LOG_DEBUG(LogObj,
"GET - '{}/{}' (Range: {}-{}) ({}/{}) [OK] (Served: {})",
@@ -674,7 +697,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st
}
void
-HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request)
+HttpObjectStoreService::PutObject(HttpRouterRequest& Request)
{
namespace fs = std::filesystem;
@@ -699,7 +722,7 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request)
const fs::path FileDirectory = FilePath.parent_path();
{
- std::lock_guard _(BucketsMutex);
+ std::lock_guard _(m_BucketsMutex);
if (!IsDir(FileDirectory))
{
diff --git a/src/zenserver/storage/objectstore/objectstore.h b/src/zenserver/storage/objectstore/objectstore.h
index cc47b50c4..8a25e750b 100644
--- a/src/zenserver/storage/objectstore/objectstore.h
+++ b/src/zenserver/storage/objectstore/objectstore.h
@@ -11,6 +11,7 @@
namespace zen {
class HttpRouterRequest;
+class HttpStatsService;
struct ObjectStoreConfig
{
@@ -24,31 +25,35 @@ struct ObjectStoreConfig
std::vector<BucketConfig> Buckets;
};
-class HttpObjectStoreService final : public zen::HttpService, public IHttpStatusProvider
+class HttpObjectStoreService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
- HttpObjectStoreService(HttpStatusService& StatusService, ObjectStoreConfig Cfg);
+ HttpObjectStoreService(HttpStatsService& StatsService, HttpStatusService& StatusService, ObjectStoreConfig Cfg);
virtual ~HttpObjectStoreService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual uint64_t GetActivityCounter() override;
private:
void Inititalize();
std::filesystem::path GetBucketDirectory(std::string_view BucketName);
- void ListBuckets(zen::HttpRouterRequest& Request);
- void CreateBucket(zen::HttpRouterRequest& Request);
- void ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path);
- void DeleteBucket(zen::HttpRouterRequest& Request);
- void GetObject(zen::HttpRouterRequest& Request, const std::string_view Path);
- void PutObject(zen::HttpRouterRequest& Request);
-
- HttpStatusService& m_StatusService;
- ObjectStoreConfig m_Cfg;
- std::mutex BucketsMutex;
- HttpRequestRouter m_Router;
- std::atomic_uint64_t TotalBytesServed{0};
+ void ListBuckets(HttpRouterRequest& Request);
+ void CreateBucket(HttpRouterRequest& Request);
+ void ListBucket(HttpRouterRequest& Request, const std::string_view Path);
+ void DeleteBucket(HttpRouterRequest& Request);
+ void GetObject(HttpRouterRequest& Request, const std::string_view Path);
+ void PutObject(HttpRouterRequest& Request);
+
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ ObjectStoreConfig m_Cfg;
+ std::mutex m_BucketsMutex;
+ HttpRequestRouter m_Router;
+ std::atomic_uint64_t m_TotalBytesServed{0};
+ metrics::OperationTiming m_HttpRequests;
};
} // namespace zen
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp
index 03b8aa382..9d132a22a 100644
--- a/src/zenserver/storage/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp
@@ -836,6 +836,21 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request)
}
}
+void
+HttpProjectService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpProjectService::Status");
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
+{
+ HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpProjectService::CollectStats()
{
@@ -906,19 +921,10 @@ HttpProjectService::CollectStats()
return Cbo.Save();
}
-void
-HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
+uint64_t
+HttpProjectService::GetActivityCounter()
{
- HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
-void
-HttpProjectService::HandleStatusRequest(HttpServerRequest& Request)
-{
- ZEN_TRACE_CPU("HttpProjectService::Status");
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ return m_HttpRequests.Count();
}
void
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.h b/src/zenserver/storage/projectstore/httpprojectstore.h
index 917337324..e3ed02f26 100644
--- a/src/zenserver/storage/projectstore/httpprojectstore.h
+++ b/src/zenserver/storage/projectstore/httpprojectstore.h
@@ -53,9 +53,10 @@ public:
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
- virtual CbObject CollectStats() override;
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
+ virtual uint64_t GetActivityCounter() override;
private:
struct ProjectStats
diff --git a/src/zenserver/storage/upstream/upstreamservice.h b/src/zenserver/storage/upstream/upstreamservice.h
index f1da03c8c..c0063c055 100644
--- a/src/zenserver/storage/upstream/upstreamservice.h
+++ b/src/zenserver/storage/upstream/upstreamservice.h
@@ -9,14 +9,14 @@ namespace zen {
class AuthMgr;
class UpstreamCache;
-class HttpUpstreamService final : public zen::HttpService
+class HttpUpstreamService final : public HttpService
{
public:
HttpUpstreamService(UpstreamCache& Upstream, AuthMgr& Mgr);
virtual ~HttpUpstreamService();
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
private:
UpstreamCache& m_Upstream;
diff --git a/src/zenserver/storage/workspaces/httpworkspaces.cpp b/src/zenserver/storage/workspaces/httpworkspaces.cpp
index 785dd62f0..0119912e9 100644
--- a/src/zenserver/storage/workspaces/httpworkspaces.cpp
+++ b/src/zenserver/storage/workspaces/httpworkspaces.cpp
@@ -110,6 +110,21 @@ HttpWorkspacesService::HandleRequest(HttpServerRequest& Request)
}
}
+void
+HttpWorkspacesService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpWorkspacesService::Status");
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpWorkspacesService::HandleStatsRequest(HttpServerRequest& HttpReq)
+{
+ HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
+}
+
CbObject
HttpWorkspacesService::CollectStats()
{
@@ -153,19 +168,10 @@ HttpWorkspacesService::CollectStats()
return Cbo.Save();
}
-void
-HttpWorkspacesService::HandleStatsRequest(HttpServerRequest& HttpReq)
+uint64_t
+HttpWorkspacesService::GetActivityCounter()
{
- HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats());
-}
-
-void
-HttpWorkspacesService::HandleStatusRequest(HttpServerRequest& Request)
-{
- ZEN_TRACE_CPU("HttpWorkspacesService::Status");
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ return m_HttpRequests.Count();
}
void
diff --git a/src/zenserver/storage/workspaces/httpworkspaces.h b/src/zenserver/storage/workspaces/httpworkspaces.h
index 7c5ddeff1..4af1316f8 100644
--- a/src/zenserver/storage/workspaces/httpworkspaces.h
+++ b/src/zenserver/storage/workspaces/httpworkspaces.h
@@ -29,9 +29,10 @@ public:
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
- virtual CbObject CollectStats() override;
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual CbObject CollectStats() override;
+ virtual uint64_t GetActivityCounter() override;
private:
struct WorkspacesStats
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index de00eb1c2..bc0a8f4ac 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -170,7 +170,7 @@ ZenStorageServer::RegisterServices()
m_Http->RegisterService(*m_HttpSessionsService);
}
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
if (m_FrontendService)
{
@@ -307,7 +307,7 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions
ObjCfg.Buckets.push_back(std::move(NewBucket));
}
- m_ObjStoreService = std::make_unique<HttpObjectStoreService>(m_StatusService, std::move(ObjCfg));
+ m_ObjStoreService = std::make_unique<HttpObjectStoreService>(m_StatsService, m_StatusService, std::move(ObjCfg));
}
if (ServerOptions.BuildStoreConfig.Enabled)