diff options
| author | Stefan Boberg <[email protected]> | 2021-08-28 21:10:07 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-28 21:10:07 +0200 |
| commit | 82d373e7fa382db18bf123f7ca69e343567c5196 (patch) | |
| tree | 34fcedc392c6b9dab7ff77d84f75877988672db8 | |
| parent | Added basic file logging (diff) | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-82d373e7fa382db18bf123f7ca69e343567c5196.tar.xz zen-82d373e7fa382db18bf123f7ca69e343567c5196.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
| -rw-r--r-- | zencore/filesystem.cpp | 22 | ||||
| -rw-r--r-- | zencore/httpserver.cpp | 9 | ||||
| -rw-r--r-- | zencore/include/zencore/filesystem.h | 3 | ||||
| -rw-r--r-- | zencore/include/zencore/httpserver.h | 1 | ||||
| -rw-r--r-- | zencore/include/zencore/scopeguard.h | 2 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 353 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 9 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 10 |
8 files changed, 334 insertions, 75 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index 908810773..591630b75 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -400,19 +400,26 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop return Success; } -bool +void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount) { using namespace fmt::literals; CAtlFile Outfile; HRESULT hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) + { + zen::CreateDirectories(Path.parent_path()); + + hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + } + if (FAILED(hRes)) { zen::ThrowSystemException(hRes, "File open failed for '{}'"_format(Path).c_str()); } - // TODO: this could be block-enlightened + // TODO: this should be block-enlightened for (size_t i = 0; i < BufferCount; ++i) { @@ -421,7 +428,7 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer while (WriteSize) { - uint64_t ChunkSize = zen::Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024); + const uint64_t ChunkSize = zen::Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024); hRes = Outfile.Write(DataPtr, gsl::narrow_cast<uint32_t>(WriteSize)); @@ -434,10 +441,17 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer DataPtr = reinterpret_cast<const uint8_t*>(DataPtr) + ChunkSize; } } +} - return true; +void +WriteFile(std::filesystem::path Path, IoBuffer Data) +{ + const IoBuffer* const DataPtr = &Data; + + WriteFile(Path, &DataPtr, 1); } + FileContents ReadFile(std::filesystem::path Path) { diff --git a/zencore/httpserver.cpp b/zencore/httpserver.cpp index b52886c0a..eaa8bdfcd 100644 --- a/zencore/httpserver.cpp +++ b/zencore/httpserver.cpp @@ -271,6 +271,15 @@ HttpServerRequest::~HttpServerRequest() } void +HttpServerRequest::WriteResponse(HttpResponse HttpResponseCode, CbPackage Data) +{ + // TODO: implement efficient version of this which can send package attachment + // payloads directly from disk + ZEN_UNUSED(HttpResponseCode, Data); + ZEN_NOT_IMPLEMENTED(); +} + +void HttpServerRequest::WriteResponse(HttpResponse HttpResponseCode, CbObject Data) { #if 0 diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h index d8140932b..a2d368d6f 100644 --- a/zencore/include/zencore/filesystem.h +++ b/zencore/include/zencore/filesystem.h @@ -43,7 +43,8 @@ struct FileContents ZENCORE_API FileContents ReadFile(std::filesystem::path Path); ZENCORE_API bool ScanFile(std::filesystem::path Path, uint64_t ChunkSize, std::function<void(const void* Data, size_t Size)>&& ProcessFunc); -ZENCORE_API bool WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount); +ZENCORE_API void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount); +ZENCORE_API void WriteFile(std::filesystem::path Path, IoBuffer Data); struct CopyFileOptions { diff --git a/zencore/include/zencore/httpserver.h b/zencore/include/zencore/httpserver.h index 86c121366..009fd9f2c 100644 --- a/zencore/include/zencore/httpserver.h +++ b/zencore/include/zencore/httpserver.h @@ -240,6 +240,7 @@ public: virtual void WriteResponse(HttpResponse HttpResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) = 0; void WriteResponse(HttpResponse HttpResponseCode, CbObject Data); + void WriteResponse(HttpResponse HttpResponseCode, CbPackage Package); void WriteResponse(HttpResponse HttpResponseCode, HttpContentType ContentType, std::string_view ResponseString); protected: diff --git a/zencore/include/zencore/scopeguard.h b/zencore/include/zencore/scopeguard.h index ba8cd3094..00836f181 100644 --- a/zencore/include/zencore/scopeguard.h +++ b/zencore/include/zencore/scopeguard.h @@ -6,7 +6,7 @@ namespace zen { template<typename T> -class ScopeGuardImpl +class [[nodiscard]] ScopeGuardImpl { public: inline ScopeGuardImpl(T&& func) : m_guardFunc(func) {} diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index c07837e05..32088ecc2 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -5,25 +5,29 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compress.h> +#include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/scopeguard.h> #include <zencore/windows.h> #include <zenstore/CAS.h> - -#include <zencore/prewindows.h> - -#include <AccCtrl.h> -#include <AclAPI.h> -#include <sddl.h> - -#include <UserEnv.h> -#pragma comment(lib, "UserEnv.lib") - -#include <atlbase.h> - -#include <zencore/postwindows.h> +#include <zenstore/cidstore.h> + +// clang-format off +#if ZEN_PLATFORM_WINDOWS +# include <zencore/prewindows.h> +# include <AccCtrl.h> +# include <AclAPI.h> +# include <sddl.h> +# include <UserEnv.h> +# pragma comment(lib, "UserEnv.lib") +# include <atlbase.h> +# include <zencore/postwindows.h> +#endif +// clang-format on #include <filesystem> #include <span> @@ -322,9 +326,10 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) return true; } -HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem::path& BaseDir) +HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir) : m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) , m_CasStore(Store) +, m_CidStore(InCidStore) , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { @@ -475,85 +480,131 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: }, HttpVerb::kGet | HttpVerb::kPost); - // Experimental - -#if 0 m_Router.RegisterRoute( - "jobs/sandbox", + "jobs/{worker}", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + else + { + Worker = It->second; + } + } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) { - SandboxedJob Job; - Job.Initialize("zen_test"); - Job.SetWorkingDirectory("c:\\temp\\sandbox1"); - Job.AddWhitelistFile("c:\\temp\\sandbox1"); - Job.SpawnJob("c:\\windows\\system32\\cmd.exe"); - } - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); -#endif + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response - m_Router.RegisterRoute( - "jobs/prep", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kPost: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response + std::vector<IoHash> NeedList; - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); + RequestObject.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); - std::vector<IoHash> NeedList; + if (!m_CasStore.FindChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); - for (auto Entry : RequestObject["files"sv]) - { - CbObjectView Ob = Entry.AsObjectView(); + if (NeedList.empty()) + { + // We already have everything - const IoHash FileHash = Ob["hash"sv].AsHash(); + CbPackage Output = ExecAction(Worker, RequestObject); - if (!m_CasStore.FindChunk(FileHash)) - { - spdlog::debug("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64()); + return HttpReq.WriteResponse(HttpResponse::OK, Output); + } - NeedList.push_back(FileHash); + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponse::NotFound, Response); } - } + break; - CbObjectWriter Cbo; - Cbo.BeginArray("need"); + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsBinary()); - Cbo.EndArray(); - CbObject Response = Cbo.Save(); + const IoHash DataHash = Attachment.GetHash(); + SharedBuffer DataView = Attachment.AsBinaryView(); - return HttpReq.WriteResponse(HttpResponse::OK, Response); + TotalAttachmentBytes += DataView.GetSize(); + ++AttachmentCount; + + CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += DataView.GetSize(); + ++NewAttachmentCount; + } + } + + spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)", + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + CbPackage Output = ExecAction(Worker, ActionObj); + + return HttpReq.WriteResponse(HttpResponse::OK, Output); + } + break; } break; } }, HttpVerb::kPost); + // This is just for reference m_Router.RegisterRoute( - "jobs", + "jobs/noop", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -664,4 +715,180 @@ HttpFunctionService::CreateNewSandbox() return Path; } +CbPackage +HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) +{ + using namespace std::literals; + + std::filesystem::path SandboxPath = CreateNewSandbox(); + + CbObject Desc = Worker.Descriptor; + + // Manifest worker in Sandbox + + for (auto& It : Desc["executables"]) + { + CbObjectView ExecEntry = It.AsObjectView(); + + std::string_view Name = ExecEntry["name"sv].AsString(); + const IoHash Hash = ExecEntry["hash"sv].AsHash(); + const uint64_t Size = ExecEntry["size"sv].AsUInt64(); + + std::filesystem::path FilePath{SandboxPath / Name}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + zen::WriteFile(FilePath, DataBuffer); + } + + for (auto& It : Desc["dirs"]) + { + std::string_view Name = It.AsString(); + std::filesystem::path DirPath{SandboxPath / Name}; + zen::CreateDirectories(DirPath); + } + + for (auto& It : Desc["files"]) + { + CbObjectView FileEntry = It.AsObjectView(); + + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash Hash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + std::filesystem::path FilePath{SandboxPath / Name}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + zen::WriteFile(FilePath, DataBuffer); + } + + // Write out action + + zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer()); + + // Manifest inputs in sandbox + + Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()}; + IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + + if (!DataBuffer) + { + throw std::exception("Chunk missing" /* ADD CONTEXT */); + } + + CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer))); + + zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer()); + }); + + // Set up environment variables + + StringBuilder<1024> EnvironmentBlock; + + for (auto& It : Desc["environment"]) + { + EnvironmentBlock.Append(It.AsString()); + EnvironmentBlock.Append('\0'); + } + EnvironmentBlock.Append('\0'); + EnvironmentBlock.Append('\0'); + + // Execute process + + std::string_view ExecPath = Desc["path"].AsString(); + std::filesystem::path ExePath = SandboxPath / ExecPath; + + WideStringBuilder<512> CommandLine; + CommandLine.Append(L'"'); + CommandLine.Append(ExePath.c_str()); + CommandLine.Append(L'"'); + CommandLine.Append(L" -Build=build.action"); + + LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; + LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; + BOOL bInheritHandles = FALSE; + DWORD dwCreationFlags = 0; + + STARTUPINFO StartupInfo{}; + StartupInfo.cb = sizeof StartupInfo; + + PROCESS_INFORMATION ProcessInformation{}; + + BOOL Success = CreateProcessW(nullptr, + CommandLine.Data(), + lpProcessAttributes, + lpThreadAttributes, + bInheritHandles, + dwCreationFlags, + nullptr, // (LPVOID)EnvironmentBlock.c_str(), // Environment block + SandboxPath.c_str(), // Current directory + &StartupInfo, + /* out */ &ProcessInformation); + + if (!Success) + { + zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); + } + + CloseHandle(ProcessInformation.hThread); + auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); }); + + DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE); + + if (Result != WAIT_OBJECT_0) + { + zen::ThrowLastError("Process wait failed" /* TODO: Add context */); + } + + DWORD ExitCode = 0; + GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode); + + // Gather outputs + + FileContents OutputData = zen::ReadFile(SandboxPath / "build.output"); + + if (OutputData.ErrorCode) + { + throw std::system_error(OutputData.ErrorCode, "Failed to read build output file"); + } + + // TODO: should have a more straightforward way to perform this + ZEN_ASSERT(OutputData.Data.size() == 1); + + CbPackage OutputPackage; + CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); + + Output.IterateAttachments([&](CbFieldView Field) { + IoHash Hash = Field.AsHash(); + std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; + + FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output"); + + if (ChunkData.ErrorCode) + { + throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file"); + } + + ZEN_ASSERT(OutputData.Data.size() == 1); + + CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash); + OutputPackage.AddAttachment(Attachment); + }); + + OutputPackage.SetObject(Output); + + return OutputPackage; +} + } // namespace zen diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 401077f8c..20a58507b 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -13,6 +13,7 @@ namespace zen { class CasStore; +class CidStore; /** * Lambda style compute function service @@ -20,7 +21,7 @@ class CasStore; class HttpFunctionService : public HttpService { public: - HttpFunctionService(CasStore& Store, const std::filesystem::path& SandboxBaseDir); + HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& SandboxBaseDir); ~HttpFunctionService(); virtual const char* BaseUri() const override; @@ -30,17 +31,19 @@ private: spdlog::logger m_Log; HttpRequestRouter m_Router; CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_SandboxPath; std::filesystem::path m_FunctionPath; std::atomic<int> m_SandboxCount{0}; - std::filesystem::path CreateNewSandbox(); - struct WorkerDesc { CbObject Descriptor; }; + [[nodiscard]] std::filesystem::path CreateNewSandbox(); + [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); + RwLock m_WorkerLock; std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; }; diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 8c2fcf35d..32a468452 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -101,26 +101,30 @@ public: // Ok so now we're configured, let's kick things off + spdlog::info("initializing storage"); + zen::CasStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CasStore->Initialize(Config); + m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid"); + spdlog::info("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CasStore, m_DataRoot / "projects"); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CasStore, m_ProjectStore}); m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); + spdlog::info("instantiating compute services"); + std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; zen::CreateDirectories(ApplySandboxDir); - m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, ApplySandboxDir); - - m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid"); + m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, ApplySandboxDir); if (ServiceConfig.LegacyCacheEnabled) { |