aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-24 16:09:54 +0200
committerStefan Boberg <[email protected]>2021-08-24 16:09:54 +0200
commit786e814b95d8dab99cc1020c889741f996c89972 (patch)
tree661a0e7f3a0bea8ff6fed7b513095a7028b7d2bb
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-786e814b95d8dab99cc1020c889741f996c89972.tar.xz
zen-786e814b95d8dab99cc1020c889741f996c89972.zip
WIP interface for submitting workers/jobs
-rw-r--r--zenserver/compute/apply.cpp66
-rw-r--r--zenserver/compute/apply.h11
-rw-r--r--zenserver/zenserver.cpp11
3 files changed, 85 insertions, 3 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 02a714cf0..8a2750516 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
@@ -323,21 +324,80 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
, m_FunctionPath(BaseDir / "func")
{
m_Router.AddPattern("job", "([[:digit:]]+)");
- m_Router.AddPattern("function", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
- "functions/{function}",
+ "workers/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ else
+ {
+ const WorkerDesc& Desc = It->second;
+ return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor);
+ }
+ }
break;
case HttpVerb::kPost:
{
- CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted to populate CAS
+
+ CasChunkSet ChunkSet;
+
+ FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); });
+
+ m_CasStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
+
+ return HttpReq.WriteResponse(HttpResponse::NoContent);
+ }
+ else
+ {
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ for (const IoHash& Hash : ChunkSet.GetChunkSet())
+ {
+ ResponseWriter.AddHash(Hash);
+ }
+
+ ResponseWriter.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save());
+ }
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
+ }
+ break;
+ }
}
break;
}
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index e079317ec..cb91be07e 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -2,10 +2,13 @@
#pragma once
+#include <zencore/compactbinary.h>
#include <zencore/httpserver.h>
+#include <zencore/iohash.h>
#include <spdlog/spdlog.h>
#include <filesystem>
+#include <unordered_map>
namespace zen {
@@ -32,6 +35,14 @@ private:
std::atomic<int> m_SandboxCount{0};
std::filesystem::path CreateNewSandbox();
+
+ struct WorkerDesc
+ {
+ CbObject Descriptor;
+ };
+
+ RwLock m_WorkerLock;
+ std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
};
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index d9f72ed12..8c2fcf35d 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -54,6 +54,7 @@
#include "admin/admin.h"
#include "cache/kvcache.h"
#include "cache/structuredcache.h"
+#include "compute/apply.h"
#include "diag/diagsvcs.h"
#include "experimental/usnjournal.h"
#include "projectstore.h"
@@ -115,6 +116,10 @@ public:
zen::CreateDirectories(SandboxDir);
m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+ std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
+ zen::CreateDirectories(ApplySandboxDir);
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, ApplySandboxDir);
+
m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
if (ServiceConfig.LegacyCacheEnabled)
@@ -169,6 +174,11 @@ public:
{
m_Http.AddEndpoint(*m_HttpLaunchService);
}
+
+ if (m_HttpFunctionService)
+ {
+ m_Http.AddEndpoint(*m_HttpFunctionService);
+ }
}
void StartMesh(int BasePort)
@@ -296,6 +306,7 @@ private:
HttpAdminService m_AdminService;
HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
+ std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
bool m_DebugOptionForcedCrash = false;
};