aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-28 21:10:07 +0200
committerStefan Boberg <[email protected]>2021-08-28 21:10:07 +0200
commit82d373e7fa382db18bf123f7ca69e343567c5196 (patch)
tree34fcedc392c6b9dab7ff77d84f75877988672db8
parentAdded basic file logging (diff)
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-82d373e7fa382db18bf123f7ca69e343567c5196.tar.xz
zen-82d373e7fa382db18bf123f7ca69e343567c5196.zip
Merge branch 'main' of https://github.com/EpicGames/zen
-rw-r--r--zencore/filesystem.cpp22
-rw-r--r--zencore/httpserver.cpp9
-rw-r--r--zencore/include/zencore/filesystem.h3
-rw-r--r--zencore/include/zencore/httpserver.h1
-rw-r--r--zencore/include/zencore/scopeguard.h2
-rw-r--r--zenserver/compute/apply.cpp353
-rw-r--r--zenserver/compute/apply.h9
-rw-r--r--zenserver/zenserver.cpp10
8 files changed, 334 insertions, 75 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 908810773..591630b75 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -400,19 +400,26 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop
return Success;
}
-bool
+void
WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount)
{
using namespace fmt::literals;
CAtlFile Outfile;
HRESULT hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS);
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ zen::CreateDirectories(Path.parent_path());
+
+ hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS);
+ }
+
if (FAILED(hRes))
{
zen::ThrowSystemException(hRes, "File open failed for '{}'"_format(Path).c_str());
}
- // TODO: this could be block-enlightened
+ // TODO: this should be block-enlightened
for (size_t i = 0; i < BufferCount; ++i)
{
@@ -421,7 +428,7 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer
while (WriteSize)
{
- uint64_t ChunkSize = zen::Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024);
+ const uint64_t ChunkSize = zen::Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024);
hRes = Outfile.Write(DataPtr, gsl::narrow_cast<uint32_t>(WriteSize));
@@ -434,10 +441,17 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer
DataPtr = reinterpret_cast<const uint8_t*>(DataPtr) + ChunkSize;
}
}
+}
- return true;
+void
+WriteFile(std::filesystem::path Path, IoBuffer Data)
+{
+ const IoBuffer* const DataPtr = &Data;
+
+ WriteFile(Path, &DataPtr, 1);
}
+
FileContents
ReadFile(std::filesystem::path Path)
{
diff --git a/zencore/httpserver.cpp b/zencore/httpserver.cpp
index b52886c0a..eaa8bdfcd 100644
--- a/zencore/httpserver.cpp
+++ b/zencore/httpserver.cpp
@@ -271,6 +271,15 @@ HttpServerRequest::~HttpServerRequest()
}
void
+HttpServerRequest::WriteResponse(HttpResponse HttpResponseCode, CbPackage Data)
+{
+ // TODO: implement efficient version of this which can send package attachment
+ // payloads directly from disk
+ ZEN_UNUSED(HttpResponseCode, Data);
+ ZEN_NOT_IMPLEMENTED();
+}
+
+void
HttpServerRequest::WriteResponse(HttpResponse HttpResponseCode, CbObject Data)
{
#if 0
diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h
index d8140932b..a2d368d6f 100644
--- a/zencore/include/zencore/filesystem.h
+++ b/zencore/include/zencore/filesystem.h
@@ -43,7 +43,8 @@ struct FileContents
ZENCORE_API FileContents ReadFile(std::filesystem::path Path);
ZENCORE_API bool ScanFile(std::filesystem::path Path, uint64_t ChunkSize, std::function<void(const void* Data, size_t Size)>&& ProcessFunc);
-ZENCORE_API bool WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount);
+ZENCORE_API void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount);
+ZENCORE_API void WriteFile(std::filesystem::path Path, IoBuffer Data);
struct CopyFileOptions
{
diff --git a/zencore/include/zencore/httpserver.h b/zencore/include/zencore/httpserver.h
index 86c121366..009fd9f2c 100644
--- a/zencore/include/zencore/httpserver.h
+++ b/zencore/include/zencore/httpserver.h
@@ -240,6 +240,7 @@ public:
virtual void WriteResponse(HttpResponse HttpResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) = 0;
void WriteResponse(HttpResponse HttpResponseCode, CbObject Data);
+ void WriteResponse(HttpResponse HttpResponseCode, CbPackage Package);
void WriteResponse(HttpResponse HttpResponseCode, HttpContentType ContentType, std::string_view ResponseString);
protected:
diff --git a/zencore/include/zencore/scopeguard.h b/zencore/include/zencore/scopeguard.h
index ba8cd3094..00836f181 100644
--- a/zencore/include/zencore/scopeguard.h
+++ b/zencore/include/zencore/scopeguard.h
@@ -6,7 +6,7 @@
namespace zen {
template<typename T>
-class ScopeGuardImpl
+class [[nodiscard]] ScopeGuardImpl
{
public:
inline ScopeGuardImpl(T&& func) : m_guardFunc(func) {}
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index c07837e05..32088ecc2 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -5,25 +5,29 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/scopeguard.h>
#include <zencore/windows.h>
#include <zenstore/CAS.h>
-
-#include <zencore/prewindows.h>
-
-#include <AccCtrl.h>
-#include <AclAPI.h>
-#include <sddl.h>
-
-#include <UserEnv.h>
-#pragma comment(lib, "UserEnv.lib")
-
-#include <atlbase.h>
-
-#include <zencore/postwindows.h>
+#include <zenstore/cidstore.h>
+
+// clang-format off
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/prewindows.h>
+# include <AccCtrl.h>
+# include <AclAPI.h>
+# include <sddl.h>
+# include <UserEnv.h>
+# pragma comment(lib, "UserEnv.lib")
+# include <atlbase.h>
+# include <zencore/postwindows.h>
+#endif
+// clang-format on
#include <filesystem>
#include <span>
@@ -322,9 +326,10 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath)
return true;
}
-HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem::path& BaseDir)
+HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir)
: m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
, m_CasStore(Store)
+, m_CidStore(InCidStore)
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
@@ -475,85 +480,131 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
},
HttpVerb::kGet | HttpVerb::kPost);
- // Experimental
-
-#if 0
m_Router.RegisterRoute(
- "jobs/sandbox",
+ "jobs/{worker}",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
break;
case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
{
- SandboxedJob Job;
- Job.Initialize("zen_test");
- Job.SetWorkingDirectory("c:\\temp\\sandbox1");
- Job.AddWhitelistFile("c:\\temp\\sandbox1");
- Job.SpawnJob("c:\\windows\\system32\\cmd.exe");
- }
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-#endif
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
- m_Router.RegisterRoute(
- "jobs/prep",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kPost:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
+ std::vector<IoHash> NeedList;
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
+ RequestObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
- std::vector<IoHash> NeedList;
+ if (!m_CasStore.FindChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
- for (auto Entry : RequestObject["files"sv])
- {
- CbObjectView Ob = Entry.AsObjectView();
+ if (NeedList.empty())
+ {
+ // We already have everything
- const IoHash FileHash = Ob["hash"sv].AsHash();
+ CbPackage Output = ExecAction(Worker, RequestObject);
- if (!m_CasStore.FindChunk(FileHash))
- {
- spdlog::debug("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64());
+ return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ }
- NeedList.push_back(FileHash);
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound, Response);
}
- }
+ break;
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsBinary());
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
+ const IoHash DataHash = Attachment.GetHash();
+ SharedBuffer DataView = Attachment.AsBinaryView();
- return HttpReq.WriteResponse(HttpResponse::OK, Response);
+ TotalAttachmentBytes += DataView.GetSize();
+ ++AttachmentCount;
+
+ CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += DataView.GetSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)",
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ CbPackage Output = ExecAction(Worker, ActionObj);
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ }
+ break;
}
break;
}
},
HttpVerb::kPost);
+ // This is just for reference
m_Router.RegisterRoute(
- "jobs",
+ "jobs/noop",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -664,4 +715,180 @@ HttpFunctionService::CreateNewSandbox()
return Path;
}
+CbPackage
+HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
+{
+ using namespace std::literals;
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ CbObject Desc = Worker.Descriptor;
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : Desc["executables"])
+ {
+ CbObjectView ExecEntry = It.AsObjectView();
+
+ std::string_view Name = ExecEntry["name"sv].AsString();
+ const IoHash Hash = ExecEntry["hash"sv].AsHash();
+ const uint64_t Size = ExecEntry["size"sv].AsUInt64();
+
+ std::filesystem::path FilePath{SandboxPath / Name};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ }
+
+ for (auto& It : Desc["dirs"])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / Name};
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : Desc["files"])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash Hash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ std::filesystem::path FilePath{SandboxPath / Name};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ }
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()};
+ IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+
+ if (!DataBuffer)
+ {
+ throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ }
+
+ CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer)));
+
+ zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer());
+ });
+
+ // Set up environment variables
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ for (auto& It : Desc["environment"])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process
+
+ std::string_view ExecPath = Desc["path"].AsString();
+ std::filesystem::path ExePath = SandboxPath / ExecPath;
+
+ WideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ nullptr, // (LPVOID)EnvironmentBlock.c_str(), // Environment block
+ SandboxPath.c_str(), // Current directory
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+ auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); });
+
+ DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE);
+
+ if (Result != WAIT_OBJECT_0)
+ {
+ zen::ThrowLastError("Process wait failed" /* TODO: Add context */);
+ }
+
+ DWORD ExitCode = 0;
+ GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode);
+
+ // Gather outputs
+
+ FileContents OutputData = zen::ReadFile(SandboxPath / "build.output");
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, "Failed to read build output file");
+ }
+
+ // TODO: should have a more straightforward way to perform this
+ ZEN_ASSERT(OutputData.Data.size() == 1);
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]);
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+
+ FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output");
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file");
+ }
+
+ ZEN_ASSERT(OutputData.Data.size() == 1);
+
+ CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ return OutputPackage;
+}
+
} // namespace zen
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 401077f8c..20a58507b 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -13,6 +13,7 @@
namespace zen {
class CasStore;
+class CidStore;
/**
* Lambda style compute function service
@@ -20,7 +21,7 @@ class CasStore;
class HttpFunctionService : public HttpService
{
public:
- HttpFunctionService(CasStore& Store, const std::filesystem::path& SandboxBaseDir);
+ HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& SandboxBaseDir);
~HttpFunctionService();
virtual const char* BaseUri() const override;
@@ -30,17 +31,19 @@ private:
spdlog::logger m_Log;
HttpRequestRouter m_Router;
CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::filesystem::path m_SandboxPath;
std::filesystem::path m_FunctionPath;
std::atomic<int> m_SandboxCount{0};
- std::filesystem::path CreateNewSandbox();
-
struct WorkerDesc
{
CbObject Descriptor;
};
+ [[nodiscard]] std::filesystem::path CreateNewSandbox();
+ [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action);
+
RwLock m_WorkerLock;
std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
};
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 8c2fcf35d..32a468452 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -101,26 +101,30 @@ public:
// Ok so now we're configured, let's kick things off
+ spdlog::info("initializing storage");
+
zen::CasStoreConfiguration Config;
Config.RootDirectory = m_DataRoot / "cas";
m_CasStore->Initialize(Config);
+ m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
+
spdlog::info("instantiating project service");
m_ProjectStore = new zen::ProjectStore(*m_CasStore, m_DataRoot / "projects");
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CasStore, m_ProjectStore});
m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore);
+ spdlog::info("instantiating compute services");
+
std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
zen::CreateDirectories(SandboxDir);
m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
zen::CreateDirectories(ApplySandboxDir);
- m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, ApplySandboxDir);
-
- m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, ApplySandboxDir);
if (ServiceConfig.LegacyCacheEnabled)
{