diff options
| author | Stefan Boberg <[email protected]> | 2021-08-24 16:09:54 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-24 16:09:54 +0200 |
| commit | 786e814b95d8dab99cc1020c889741f996c89972 (patch) | |
| tree | 661a0e7f3a0bea8ff6fed7b513095a7028b7d2bb | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-786e814b95d8dab99cc1020c889741f996c89972.tar.xz zen-786e814b95d8dab99cc1020c889741f996c89972.zip | |
WIP interface for submitting workers/jobs
| -rw-r--r-- | zenserver/compute/apply.cpp | 66 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 11 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 11 |
3 files changed, 85 insertions, 3 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 02a714cf0..8a2750516 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -323,21 +324,80 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: , m_FunctionPath(BaseDir / "func") { m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("function", "([[:xdigit:]]{40})"); + m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( - "functions/{function}", + "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + else + { + const WorkerDesc& Desc = It->second; + return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor); + } + } break; case HttpVerb::kPost: { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted to populate CAS + + CasChunkSet ChunkSet; + + FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); }); + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); + + return HttpReq.WriteResponse(HttpResponse::NoContent); + } + else + { + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + for (const IoHash& Hash : ChunkSet.GetChunkSet()) + { + ResponseWriter.AddHash(Hash); + } + + ResponseWriter.EndArray(); + + return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save()); + } + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); + } + break; + } } break; } diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index e079317ec..cb91be07e 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -2,10 +2,13 @@ #pragma once +#include <zencore/compactbinary.h> #include <zencore/httpserver.h> +#include <zencore/iohash.h> #include <spdlog/spdlog.h> #include <filesystem> +#include <unordered_map> namespace zen { @@ -32,6 +35,14 @@ private: std::atomic<int> m_SandboxCount{0}; std::filesystem::path CreateNewSandbox(); + + struct WorkerDesc + { + CbObject Descriptor; + }; + + RwLock m_WorkerLock; + std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; }; } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index d9f72ed12..8c2fcf35d 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -54,6 +54,7 @@ #include "admin/admin.h" #include "cache/kvcache.h" #include "cache/structuredcache.h" +#include "compute/apply.h" #include "diag/diagsvcs.h" #include "experimental/usnjournal.h" #include "projectstore.h" @@ -115,6 +116,10 @@ public: zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); + std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; + zen::CreateDirectories(ApplySandboxDir); + m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, ApplySandboxDir); + m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid"); if (ServiceConfig.LegacyCacheEnabled) @@ -169,6 +174,11 @@ public: { m_Http.AddEndpoint(*m_HttpLaunchService); } + + if (m_HttpFunctionService) + { + m_Http.AddEndpoint(*m_HttpFunctionService); + } } void StartMesh(int BasePort) @@ -296,6 +306,7 @@ private: HttpAdminService m_AdminService; HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; + std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; bool m_DebugOptionForcedCrash = false; }; |