aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-05-03 15:57:02 -0700
committerGitHub <[email protected]>2022-05-03 15:57:02 -0700
commit013e2c7ab88dc51d92d683e8f8ec488bdb4d08d9 (patch)
treea0ca3186204a4f7e7315962de2408082d3d723f3
parentMerge pull request #87 from EpicGames/de/fix-compactcas-threadedinsert-test (diff)
downloadzen-1.0.1.2.tar.xz
zen-1.0.1.2.zip
Initialize upstream apply in background thread (#88)v1.0.1.2
-rw-r--r--zenserver/compute/function.cpp23
-rw-r--r--zenserver/compute/function.h1
-rw-r--r--zenserver/upstream/upstreamapply.cpp22
-rw-r--r--zenserver/upstream/upstreamapply.h1
4 files changed, 36 insertions, 11 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index dd31013ef..171c67a6e 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -39,15 +39,17 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
{
m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
- auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- m_CasStore,
- m_CidStore,
- Mgr);
- m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
- m_UpstreamApply->Initialize();
+ InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
+ auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ m_CasStore,
+ m_CidStore,
+ Mgr);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+ }};
m_Router.AddPattern("job", "([[:digit:]]+)");
m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
@@ -58,8 +60,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
- // Todo: check upstream health
- return HttpReq.WriteResponse(HttpResponseCode::OK);
+ return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable);
},
HttpVerb::kGet);
diff --git a/zenserver/compute/function.h b/zenserver/compute/function.h
index 2ddddabb4..efabe96ee 100644
--- a/zenserver/compute/function.h
+++ b/zenserver/compute/function.h
@@ -48,6 +48,7 @@ public:
virtual void HandleRequest(HttpServerRequest& Request) override;
private:
+ std::thread InitializeThread;
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 9758e7565..c397bb141 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -119,6 +119,22 @@ public:
return m_RunState.IsRunning;
}
+ virtual bool IsHealthy() const override
+ {
+ if (m_RunState.IsRunning)
+ {
+ for (const auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
{
m_Endpoints.emplace_back(std::move(Endpoint));
@@ -429,6 +445,12 @@ private:
//////////////////////////////////////////////////////////////////////////
+bool
+UpstreamApply::IsHealthy() const
+{
+ return false;
+}
+
std::unique_ptr<UpstreamApply>
UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
{
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index c6e38142c..2edc6dc49 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -167,6 +167,7 @@ public:
virtual ~UpstreamApply() = default;
virtual bool Initialize() = 0;
+ virtual bool IsHealthy() const = 0;
virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
struct EnqueueResult