aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-15 19:49:20 +0200
committerStefan Boberg <[email protected]>2021-09-15 19:49:20 +0200
commit83ccd52321a23c8f1c8a3228cbbf34b8f199a22b (patch)
tree9cf1fb68651f616aef2fa28000e4f328ef9204d8 /zenserver/compute/apply.cpp
parentAdded GetSize/GetData functions to reduce cognitive load and bridge the gap b... (diff)
parentTweaked logging to streamline access, and simplified setup code for new loggers (diff)
downloadzen-83ccd52321a23c8f1c8a3228cbbf34b8f199a22b.tar.xz
zen-83ccd52321a23c8f1c8a3228cbbf34b8f199a22b.zip
Merge branch 'main' into cbpackage-update
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp188
1 files changed, 56 insertions, 132 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index b46945d88..c3d83b2b5 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -89,7 +89,7 @@ BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLi
m_ProcessHandle.Attach(ProcessInfo.hProcess);
::CloseHandle(ProcessInfo.hThread);
- spdlog::info("Created process {}", m_ProcessId);
+ ZEN_INFO("Created process {}", m_ProcessId);
return true;
}
@@ -114,7 +114,7 @@ BasicFunctionJob::Wait(uint32_t TimeoutMs)
return true;
}
- throw std::exception("Failed wait on process handle");
+ throw std::runtime_error("Failed wait on process handle");
}
int
@@ -125,12 +125,12 @@ BasicFunctionJob::ExitCode()
if (!Success)
{
- spdlog::warn("failed getting exit code");
+ ZEN_WARN("failed getting exit code");
}
if (Ec == STILL_ACTIVE)
{
- spdlog::warn("getting exit code but process is STILL_ACTIVE");
+ ZEN_WARN("getting exit code but process is STILL_ACTIVE");
}
return gsl::narrow_cast<int>(Ec);
@@ -231,7 +231,7 @@ SandboxedFunctionJob::Initialize(std::string_view AppContainerId)
if (FAILED(hRes))
{
- spdlog::error("Failed creating app container SID");
+ ZEN_ERROR("Failed creating app container SID");
}
}
@@ -240,12 +240,12 @@ SandboxedFunctionJob::Initialize(std::string_view AppContainerId)
PWSTR Str = nullptr;
::ConvertSidToStringSid(m_AppContainerSid, &Str);
- spdlog::info("AppContainer SID : '{}'", WideToUtf8(Str));
+ ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str));
PWSTR Path = nullptr;
if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path)))
{
- spdlog::info("AppContainer folder: '{}'", WideToUtf8(Path));
+ ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path));
::CoTaskMemFree(Path);
}
@@ -321,13 +321,13 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath)
return false;
}
- spdlog::info("Created process {}", ProcessInfo.dwProcessId);
+ ZEN_INFO("Created process {}", ProcessInfo.dwProcessId);
return true;
}
HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir)
-: m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+: m_Log(logging::Get("apply"))
, m_CasStore(Store)
, m_CidStore(InCidStore)
, m_SandboxPath(BaseDir / "scratch")
@@ -378,6 +378,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
ChunkSet.AddChunk(Hash);
});
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them, hence
+ // the CAS lookup here instead of CID for the input payloads
+
m_CasStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
@@ -386,7 +390,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
- spdlog::debug("worker {}: all attachments already available", WorkerId);
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
return HttpReq.WriteResponse(HttpResponseCode::NoContent);
}
@@ -397,14 +401,14 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
for (const IoHash& Hash : ChunkSet.GetChunkSet())
{
- spdlog::debug("worker {}: need chunk {}", WorkerId, Hash);
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
ResponseWriter.AddHash(Hash);
}
ResponseWriter.EndArray();
- spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size());
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size());
return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
}
@@ -426,31 +430,35 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
for (const CbAttachment& Attachment : Attachments)
{
- ZEN_ASSERT(Attachment.IsBinary());
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ SharedBuffer Decompressed = DataView.Decompress();
+ const uint64_t DecompressedSize = DataView.GetRawSize();
- TotalAttachmentBytes += DataView.GetCompressedSize();
+ TotalAttachmentBytes += DecompressedSize;
++AttachmentCount;
- IoBuffer Payload = DataView.GetCompressed().Flatten().AsIoBuffer();
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them
- CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Payload, DataHash);
+ const CasStore::InsertResult InsertResult =
+ m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
if (InsertResult.New)
{
- TotalNewBytes += Payload.Size();
+ TotalNewBytes += DecompressedSize;
++NewAttachmentCount;
}
}
- spdlog::debug("worker {}: {} in {} attachments, {} in {} new attachments",
- WorkerId,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
RwLock::ExclusiveLockScope _(m_WorkerLock);
@@ -526,7 +534,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
RequestObject.IterateAttachments([&](CbFieldView Field) {
const IoHash FileHash = Field.AsHash();
- if (!m_CasStore.FindChunk(FileHash))
+ if (!m_CidStore.ContainsChunk(FileHash))
{
NeedList.push_back(FileHash);
}
@@ -570,28 +578,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
for (const CbAttachment& Attachment : Attachments)
{
- ZEN_ASSERT(Attachment.IsBinary());
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
- const IoHash DataHash = Attachment.GetHash();
- SharedBuffer DataView = Attachment.AsBinary();
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
- TotalAttachmentBytes += DataView.GetSize();
+ TotalAttachmentBytes += CompressedSize;
++AttachmentCount;
- CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash);
+ const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
if (InsertResult.New)
{
- TotalNewBytes += DataView.GetSize();
+ TotalNewBytes += CompressedSize;
++NewAttachmentCount;
}
}
- spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)",
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
+ ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
CbPackage Output = ExecAction(Worker, ActionObj);
@@ -603,90 +613,6 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
}
},
HttpVerb::kPost);
-
- // This is just for reference
- m_Router.RegisterRoute(
- "jobs/noop",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- break;
-
- case HttpVerb::kPost:
- {
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
-
- bool AllOk = true;
-
- std::vector<IoHash> NeedList;
-
- std::filesystem::path SandboxDir{CreateNewSandbox()};
-
- spdlog::debug("setting up job in sandbox '{}'", SandboxDir);
-
- zen::DeleteDirectories(SandboxDir);
- zen::CreateDirectories(SandboxDir);
-
- for (auto Entry : RequestObject["files"sv])
- {
- CbObjectView Ob = Entry.AsObjectView();
-
- std::string_view FileName = Ob["file"sv].AsString();
- const IoHash FileHash = Ob["hash"sv].AsHash();
- uint64_t FileSize = Ob["size"sv].AsUInt64();
-
- if (IoBuffer Chunk = m_CasStore.FindChunk(FileHash); !Chunk)
- {
- spdlog::debug("MISSING: {} {} {}", FileHash, FileName, FileSize);
- AllOk = false;
-
- NeedList.push_back(FileHash);
- }
- else
- {
- std::filesystem::path FullPath = SandboxDir / FileName;
-
- const IoBuffer* Chunks[] = {&Chunk};
-
- zen::WriteFile(FullPath, Chunks, 1);
- }
- }
-
- if (!AllOk)
- {
- // TODO: Could report all the missing pieces in the response here
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- std::string Executable8{RequestObject["cmd"].AsString()};
- std::string Args8{RequestObject["args"].AsString()};
-
- std::wstring Executable = Utf8ToWide(Executable8);
- std::wstring Args = Utf8ToWide(Args8);
-
- spdlog::debug("spawning job in sandbox '{}': '{}' '{}'", SandboxDir, Executable8, Args8);
-
- std::filesystem::path ExeName = SandboxDir / Executable;
-
- BasicFunctionJob Job;
- Job.SetWorkingDirectory(SandboxDir);
- Job.SpawnJob(ExeName, Args);
- Job.Wait();
-
- CbObjectWriter Response;
-
- Response << "exitcode" << Job.ExitCode();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
- }
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
}
HttpFunctionService::~HttpFunctionService()
@@ -704,7 +630,7 @@ HttpFunctionService::HandleRequest(HttpServerRequest& Request)
{
if (m_Router.HandleRequest(Request) == false)
{
- m_Log.warn("No route found for {0}", Request.RelativeUri());
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
}
}
@@ -741,7 +667,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
if (!DataBuffer)
{
- throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ throw std::runtime_error("Chunk missing" /* ADD CONTEXT */);
}
zen::WriteFile(FilePath, DataBuffer);
@@ -767,7 +693,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
if (!DataBuffer)
{
- throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ throw std::runtime_error("Chunk missing" /* ADD CONTEXT */);
}
zen::WriteFile(FilePath, DataBuffer);
@@ -780,18 +706,16 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
// Manifest inputs in sandbox
Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()};
- IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
if (!DataBuffer)
{
- throw std::exception("Chunk missing" /* ADD CONTEXT */);
+ throw std::runtime_error("Chunk missing" /* ADD CONTEXT */);
}
- CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer)));
-
- zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer());
+ zen::WriteFile(FilePath, DataBuffer);
});
// Set up environment variables
@@ -884,7 +808,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
ZEN_ASSERT(OutputData.Data.size() == 1);
- CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash);
+ CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])));
OutputPackage.AddAttachment(Attachment);
});