aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-28 21:08:02 +0200
committerStefan Boberg <[email protected]>2021-08-28 21:08:02 +0200
commit91caa4f800586b9edcf2bb31b976350c7c9feaca (patch)
tree4d967db9a03a308ed5c2be601ea01963ee064d02 /zenserver/compute/apply.cpp
parentWriteResponse stub for CbPackage responses (awaiting decision on format with ... (diff)
downloadzen-91caa4f800586b9edcf2bb31b976350c7c9feaca.tar.xz
zen-91caa4f800586b9edcf2bb31b976350c7c9feaca.zip
Basic implementation of function evaluation
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp353
1 files changed, 290 insertions, 63 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index c07837e05..32088ecc2 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -5,25 +5,29 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/scopeguard.h>
#include <zencore/windows.h>
#include <zenstore/CAS.h>
-
-#include <zencore/prewindows.h>
-
-#include <AccCtrl.h>
-#include <AclAPI.h>
-#include <sddl.h>
-
-#include <UserEnv.h>
-#pragma comment(lib, "UserEnv.lib")
-
-#include <atlbase.h>
-
-#include <zencore/postwindows.h>
+#include <zenstore/cidstore.h>
+
+// clang-format off
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/prewindows.h>
+# include <AccCtrl.h>
+# include <AclAPI.h>
+# include <sddl.h>
+# include <UserEnv.h>
+# pragma comment(lib, "UserEnv.lib")
+# include <atlbase.h>
+# include <zencore/postwindows.h>
+#endif
+// clang-format on
#include <filesystem>
#include <span>
@@ -322,9 +326,10 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath)
return true;
}
-HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem::path& BaseDir)
+HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir)
: m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
, m_CasStore(Store)
+, m_CidStore(InCidStore)
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
@@ -475,85 +480,131 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
},
HttpVerb::kGet | HttpVerb::kPost);
- // Experimental
-
-#if 0
m_Router.RegisterRoute(
- "jobs/sandbox",
+ "jobs/{worker}",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
break;
case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
{
- SandboxedJob Job;
- Job.Initialize("zen_test");
- Job.SetWorkingDirectory("c:\\temp\\sandbox1");
- Job.AddWhitelistFile("c:\\temp\\sandbox1");
- Job.SpawnJob("c:\\windows\\system32\\cmd.exe");
- }
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-#endif
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
- m_Router.RegisterRoute(
- "jobs/prep",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kPost:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
+ std::vector<IoHash> NeedList;
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
+ RequestObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
- std::vector<IoHash> NeedList;
+ if (!m_CasStore.FindChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
- for (auto Entry : RequestObject["files"sv])
- {
- CbObjectView Ob = Entry.AsObjectView();
+ if (NeedList.empty())
+ {
+ // We already have everything
- const IoHash FileHash = Ob["hash"sv].AsHash();
+ CbPackage Output = ExecAction(Worker, RequestObject);
- if (!m_CasStore.FindChunk(FileHash))
- {
- spdlog::debug("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64());
+ return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ }
- NeedList.push_back(FileHash);
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound, Response);
}
- }
+ break;
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsBinary());
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
+ const IoHash DataHash = Attachment.GetHash();
+ SharedBuffer DataView = Attachment.AsBinaryView();
- return HttpReq.WriteResponse(HttpResponse::OK, Response);
+ TotalAttachmentBytes += DataView.GetSize();
+ ++AttachmentCount;
+
+ CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += DataView.GetSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)",
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ CbPackage Output = ExecAction(Worker, ActionObj);
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ }
+ break;
}
break;
}
},
HttpVerb::kPost);
+ // This is just for reference
m_Router.RegisterRoute(
- "jobs",
+ "jobs/noop",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -664,4 +715,180 @@ HttpFunctionService::CreateNewSandbox()
return Path;
}
+CbPackage
+HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
+{
+ using namespace std::literals;
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ CbObject Desc = Worker.Descriptor;
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : Desc["executables"])
+ {
+ CbObjectView ExecEntry = It.AsObjectView();
+
+ std::string_view Name = ExecEntry["name"sv].AsString();
+ const IoHash Hash = ExecEntry["hash"sv].AsHash();
+ const uint64_t Size = ExecEntry["size"sv].AsUInt64();
+
+ std::filesystem::path FilePath{SandboxPath / Name};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ }
+
+ for (auto& It : Desc["dirs"])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / Name};
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : Desc["files"])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash Hash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ std::filesystem::path FilePath{SandboxPath / Name};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ }
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer)));
+
+ zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer());
+ });
+
+ // Set up environment variables
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ for (auto& It : Desc["environment"])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process
+
+ std::string_view ExecPath = Desc["path"].AsString();
+ std::filesystem::path ExePath = SandboxPath / ExecPath;
+
+ WideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ nullptr, // (LPVOID)EnvironmentBlock.c_str(), // Environment block
+ SandboxPath.c_str(), // Current directory
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+ auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); });
+
+ DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE);
+
+ if (Result != WAIT_OBJECT_0)
+ {
+ zen::ThrowLastError("Process wait failed" /* TODO: Add context */);
+ }
+
+ DWORD ExitCode = 0;
+ GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode);
+
+ // Gather outputs
+
+ FileContents OutputData = zen::ReadFile(SandboxPath / "build.output");
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, "Failed to read build output file");
+ }
+
+ // TODO: should have a more straightforward way to perform this
+ ZEN_ASSERT(OutputData.Data.size() == 1);
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]);
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+
+ FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output");
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file");
+ }
+
+ ZEN_ASSERT(OutputData.Data.size() == 1);
+
+ CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ return OutputPackage;
+}
+
} // namespace zen