diff options
| author | Stefan Boberg <[email protected]> | 2021-08-24 16:09:54 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-24 16:09:54 +0200 |
| commit | 786e814b95d8dab99cc1020c889741f996c89972 (patch) | |
| tree | 661a0e7f3a0bea8ff6fed7b513095a7028b7d2bb /zenserver/compute/apply.cpp | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-786e814b95d8dab99cc1020c889741f996c89972.tar.xz zen-786e814b95d8dab99cc1020c889741f996c89972.zip | |
WIP interface for submitting workers/jobs
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 66 |
1 files changed, 63 insertions, 3 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 02a714cf0..8a2750516 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -323,21 +324,80 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: , m_FunctionPath(BaseDir / "func") { m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("function", "([[:xdigit:]]{40})"); + m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( - "functions/{function}", + "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + else + { + const WorkerDesc& Desc = It->second; + return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor); + } + } break; case HttpVerb::kPost: { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted to populate CAS + + CasChunkSet ChunkSet; + + FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); }); + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); + + return HttpReq.WriteResponse(HttpResponse::NoContent); + } + else + { + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + for (const IoHash& Hash : ChunkSet.GetChunkSet()) + { + ResponseWriter.AddHash(Hash); + } + + ResponseWriter.EndArray(); + + return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save()); + } + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); + } + break; + } } break; } |