diff options
| author | Stefan Boberg <[email protected]> | 2021-08-28 21:08:02 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-28 21:08:02 +0200 |
| commit | 91caa4f800586b9edcf2bb31b976350c7c9feaca (patch) | |
| tree | 4d967db9a03a308ed5c2be601ea01963ee064d02 /zenserver/compute/apply.cpp | |
| parent | WriteResponse stub for CbPackage responses (awaiting decision on format with ... (diff) | |
| download | zen-91caa4f800586b9edcf2bb31b976350c7c9feaca.tar.xz zen-91caa4f800586b9edcf2bb31b976350c7c9feaca.zip | |
Basic implementation of function evaluation
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 353 |
1 files changed, 290 insertions, 63 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index c07837e05..32088ecc2 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -5,25 +5,29 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compress.h> +#include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/scopeguard.h> #include <zencore/windows.h> #include <zenstore/CAS.h> - -#include <zencore/prewindows.h> - -#include <AccCtrl.h> -#include <AclAPI.h> -#include <sddl.h> - -#include <UserEnv.h> -#pragma comment(lib, "UserEnv.lib") - -#include <atlbase.h> - -#include <zencore/postwindows.h> +#include <zenstore/cidstore.h> + +// clang-format off +#if ZEN_PLATFORM_WINDOWS +# include <zencore/prewindows.h> +# include <AccCtrl.h> +# include <AclAPI.h> +# include <sddl.h> +# include <UserEnv.h> +# pragma comment(lib, "UserEnv.lib") +# include <atlbase.h> +# include <zencore/postwindows.h> +#endif +// clang-format on #include <filesystem> #include <span> @@ -322,9 +326,10 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) return true; } -HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem::path& BaseDir) +HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir) : m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) , m_CasStore(Store) +, m_CidStore(InCidStore) , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { @@ -475,85 +480,131 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: }, HttpVerb::kGet | HttpVerb::kPost); - // Experimental - -#if 0 m_Router.RegisterRoute( - "jobs/sandbox", + "jobs/{worker}", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + else + { + Worker = It->second; + } + } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) { - SandboxedJob Job; - Job.Initialize("zen_test"); - Job.SetWorkingDirectory("c:\\temp\\sandbox1"); - Job.AddWhitelistFile("c:\\temp\\sandbox1"); - Job.SpawnJob("c:\\windows\\system32\\cmd.exe"); - } - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); -#endif + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response - m_Router.RegisterRoute( - "jobs/prep", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kPost: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response + std::vector<IoHash> NeedList; - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); + RequestObject.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); - std::vector<IoHash> NeedList; + if (!m_CasStore.FindChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); - for (auto Entry : RequestObject["files"sv]) - { - CbObjectView Ob = Entry.AsObjectView(); + if (NeedList.empty()) + { + // We already have everything - const IoHash FileHash = Ob["hash"sv].AsHash(); + CbPackage Output = ExecAction(Worker, RequestObject); - if (!m_CasStore.FindChunk(FileHash)) - { - spdlog::debug("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64()); + return HttpReq.WriteResponse(HttpResponse::OK, Output); + } - NeedList.push_back(FileHash); + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponse::NotFound, Response); } - } + break; - CbObjectWriter Cbo; - Cbo.BeginArray("need"); + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsBinary()); - Cbo.EndArray(); - CbObject Response = Cbo.Save(); + const IoHash DataHash = Attachment.GetHash(); + SharedBuffer DataView = Attachment.AsBinaryView(); - return HttpReq.WriteResponse(HttpResponse::OK, Response); + TotalAttachmentBytes += DataView.GetSize(); + ++AttachmentCount; + + CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += DataView.GetSize(); + ++NewAttachmentCount; + } + } + + spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)", + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + CbPackage Output = ExecAction(Worker, ActionObj); + + return HttpReq.WriteResponse(HttpResponse::OK, Output); + } + break; } break; } }, HttpVerb::kPost); + // This is just for reference m_Router.RegisterRoute( - "jobs", + "jobs/noop", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -664,4 +715,180 @@ HttpFunctionService::CreateNewSandbox() return Path; } +CbPackage +HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) +{ + using namespace std::literals; + + std::filesystem::path SandboxPath = CreateNewSandbox(); + + CbObject Desc = Worker.Descriptor; + + // Manifest worker in Sandbox + + for (auto& It : Desc["executables"]) + { + CbObjectView ExecEntry = It.AsObjectView(); + + std::string_view Name = ExecEntry["name"sv].AsString(); + const IoHash Hash = ExecEntry["hash"sv].AsHash(); + const uint64_t Size = ExecEntry["size"sv].AsUInt64(); + + std::filesystem::path FilePath{SandboxPath / Name}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + zen::WriteFile(FilePath, DataBuffer); + } + + for (auto& It : Desc["dirs"]) + { + std::string_view Name = It.AsString(); + std::filesystem::path DirPath{SandboxPath / Name}; + zen::CreateDirectories(DirPath); + } + + for (auto& It : Desc["files"]) + { + CbObjectView FileEntry = It.AsObjectView(); + + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash Hash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + std::filesystem::path FilePath{SandboxPath / Name}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + zen::WriteFile(FilePath, DataBuffer); + } + + // Write out action + + zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer()); + + // Manifest inputs in sandbox + + Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer))); + + zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer()); + }); + + // Set up environment variables + + StringBuilder<1024> EnvironmentBlock; + + for (auto& It : Desc["environment"]) + { + EnvironmentBlock.Append(It.AsString()); + EnvironmentBlock.Append('\0'); + } + EnvironmentBlock.Append('\0'); + EnvironmentBlock.Append('\0'); + + // Execute process + + std::string_view ExecPath = Desc["path"].AsString(); + std::filesystem::path ExePath = SandboxPath / ExecPath; + + WideStringBuilder<512> CommandLine; + CommandLine.Append(L'"'); + CommandLine.Append(ExePath.c_str()); + CommandLine.Append(L'"'); + CommandLine.Append(L" -Build=build.action"); + + LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; + LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; + BOOL bInheritHandles = FALSE; + DWORD dwCreationFlags = 0; + + STARTUPINFO StartupInfo{}; + StartupInfo.cb = sizeof StartupInfo; + + PROCESS_INFORMATION ProcessInformation{}; + + BOOL Success = CreateProcessW(nullptr, + CommandLine.Data(), + lpProcessAttributes, + lpThreadAttributes, + bInheritHandles, + dwCreationFlags, + nullptr, // (LPVOID)EnvironmentBlock.c_str(), // Environment block + SandboxPath.c_str(), // Current directory + &StartupInfo, + /* out */ &ProcessInformation); + + if (!Success) + { + zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); + } + + CloseHandle(ProcessInformation.hThread); + auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); }); + + DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE); + + if (Result != WAIT_OBJECT_0) + { + zen::ThrowLastError("Process wait failed" /* TODO: Add context */); + } + + DWORD ExitCode = 0; + GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode); + + // Gather outputs + + FileContents OutputData = zen::ReadFile(SandboxPath / "build.output"); + + if (OutputData.ErrorCode) + { + throw std::system_error(OutputData.ErrorCode, "Failed to read build output file"); + } + + // TODO: should have a more straightforward way to perform this + ZEN_ASSERT(OutputData.Data.size() == 1); + + CbPackage OutputPackage; + CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); + + Output.IterateAttachments([&](CbFieldView Field) { + IoHash Hash = Field.AsHash(); + std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; + + FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output"); + + if (ChunkData.ErrorCode) + { + throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file"); + } + + ZEN_ASSERT(OutputData.Data.size() == 1); + + CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash); + OutputPackage.AddAttachment(Attachment); + }); + + OutputPackage.SetObject(Output); + + return OutputPackage; +} + } // namespace zen |