aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-24 16:09:54 +0200
committerStefan Boberg <[email protected]>2021-08-24 16:09:54 +0200
commit786e814b95d8dab99cc1020c889741f996c89972 (patch)
tree661a0e7f3a0bea8ff6fed7b513095a7028b7d2bb /zenserver/compute/apply.cpp
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-786e814b95d8dab99cc1020c889741f996c89972.tar.xz
zen-786e814b95d8dab99cc1020c889741f996c89972.zip
WIP interface for submitting workers/jobs
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp66
1 files changed, 63 insertions, 3 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 02a714cf0..8a2750516 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
@@ -323,21 +324,80 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
, m_FunctionPath(BaseDir / "func")
{
m_Router.AddPattern("job", "([[:digit:]]+)");
- m_Router.AddPattern("function", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
- "functions/{function}",
+ "workers/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ else
+ {
+ const WorkerDesc& Desc = It->second;
+ return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor);
+ }
+ }
break;
case HttpVerb::kPost:
{
- CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted to populate CAS
+
+ CasChunkSet ChunkSet;
+
+ FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); });
+
+ m_CasStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
+
+ return HttpReq.WriteResponse(HttpResponse::NoContent);
+ }
+ else
+ {
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ for (const IoHash& Hash : ChunkSet.GetChunkSet())
+ {
+ ResponseWriter.AddHash(Hash);
+ }
+
+ ResponseWriter.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save());
+ }
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
+ }
+ break;
+ }
}
break;
}