diff options
| author | Stefan Boberg <[email protected]> | 2021-09-15 19:49:20 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-15 19:49:20 +0200 |
| commit | 83ccd52321a23c8f1c8a3228cbbf34b8f199a22b (patch) | |
| tree | 9cf1fb68651f616aef2fa28000e4f328ef9204d8 /zenserver/compute/apply.cpp | |
| parent | Added GetSize/GetData functions to reduce cognitive load and bridge the gap b... (diff) | |
| parent | Tweaked logging to streamline access, and simplified setup code for new loggers (diff) | |
| download | zen-83ccd52321a23c8f1c8a3228cbbf34b8f199a22b.tar.xz zen-83ccd52321a23c8f1c8a3228cbbf34b8f199a22b.zip | |
Merge branch 'main' into cbpackage-update
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 188 |
1 files changed, 56 insertions, 132 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index b46945d88..c3d83b2b5 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -89,7 +89,7 @@ BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLi m_ProcessHandle.Attach(ProcessInfo.hProcess); ::CloseHandle(ProcessInfo.hThread); - spdlog::info("Created process {}", m_ProcessId); + ZEN_INFO("Created process {}", m_ProcessId); return true; } @@ -114,7 +114,7 @@ BasicFunctionJob::Wait(uint32_t TimeoutMs) return true; } - throw std::exception("Failed wait on process handle"); + throw std::runtime_error("Failed wait on process handle"); } int @@ -125,12 +125,12 @@ BasicFunctionJob::ExitCode() if (!Success) { - spdlog::warn("failed getting exit code"); + ZEN_WARN("failed getting exit code"); } if (Ec == STILL_ACTIVE) { - spdlog::warn("getting exit code but process is STILL_ACTIVE"); + ZEN_WARN("getting exit code but process is STILL_ACTIVE"); } return gsl::narrow_cast<int>(Ec); @@ -231,7 +231,7 @@ SandboxedFunctionJob::Initialize(std::string_view AppContainerId) if (FAILED(hRes)) { - spdlog::error("Failed creating app container SID"); + ZEN_ERROR("Failed creating app container SID"); } } @@ -240,12 +240,12 @@ SandboxedFunctionJob::Initialize(std::string_view AppContainerId) PWSTR Str = nullptr; ::ConvertSidToStringSid(m_AppContainerSid, &Str); - spdlog::info("AppContainer SID : '{}'", WideToUtf8(Str)); + ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str)); PWSTR Path = nullptr; if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path))) { - spdlog::info("AppContainer folder: '{}'", WideToUtf8(Path)); + ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path)); ::CoTaskMemFree(Path); } @@ -321,13 +321,13 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) return false; } - spdlog::info("Created process {}", ProcessInfo.dwProcessId); + ZEN_INFO("Created process {}", ProcessInfo.dwProcessId); return true; } HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir) -: m_Log("apply", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +: m_Log(logging::Get("apply")) , m_CasStore(Store) , m_CidStore(InCidStore) , m_SandboxPath(BaseDir / "scratch") @@ -378,6 +378,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, ChunkSet.AddChunk(Hash); }); + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them, hence + // the CAS lookup here instead of CID for the input payloads + m_CasStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) @@ -386,7 +390,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); - spdlog::debug("worker {}: all attachments already available", WorkerId); + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } @@ -397,14 +401,14 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, for (const IoHash& Hash : ChunkSet.GetChunkSet()) { - spdlog::debug("worker {}: need chunk {}", WorkerId, Hash); + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); } ResponseWriter.EndArray(); - spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size()); + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } @@ -426,31 +430,35 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, for (const CbAttachment& Attachment : Attachments) { - ZEN_ASSERT(Attachment.IsBinary()); + ZEN_ASSERT(Attachment.IsCompressedBinary()); - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + SharedBuffer Decompressed = DataView.Decompress(); + const uint64_t DecompressedSize = DataView.GetRawSize(); - TotalAttachmentBytes += DataView.GetCompressedSize(); + TotalAttachmentBytes += DecompressedSize; ++AttachmentCount; - IoBuffer Payload = DataView.GetCompressed().Flatten().AsIoBuffer(); + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them - CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Payload, DataHash); + const CasStore::InsertResult InsertResult = + m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); if (InsertResult.New) { - TotalNewBytes += Payload.Size(); + TotalNewBytes += DecompressedSize; ++NewAttachmentCount; } } - spdlog::debug("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); RwLock::ExclusiveLockScope _(m_WorkerLock); @@ -526,7 +534,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, RequestObject.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } @@ -570,28 +578,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, for (const CbAttachment& Attachment : Attachments) { - ZEN_ASSERT(Attachment.IsBinary()); + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); - const IoHash DataHash = Attachment.GetHash(); - SharedBuffer DataView = Attachment.AsBinary(); + const uint64_t CompressedSize = DataView.GetCompressedSize(); - TotalAttachmentBytes += DataView.GetSize(); + TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash); + const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { - TotalNewBytes += DataView.GetSize(); + TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } - spdlog::debug("new action: {}B in {} attachments. {}B new ({} attachments)", - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); + ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); CbPackage Output = ExecAction(Worker, ActionObj); @@ -603,90 +613,6 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, } }, HttpVerb::kPost); - - // This is just for reference - m_Router.RegisterRoute( - "jobs/noop", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - break; - - case HttpVerb::kPost: - { - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - bool AllOk = true; - - std::vector<IoHash> NeedList; - - std::filesystem::path SandboxDir{CreateNewSandbox()}; - - spdlog::debug("setting up job in sandbox '{}'", SandboxDir); - - zen::DeleteDirectories(SandboxDir); - zen::CreateDirectories(SandboxDir); - - for (auto Entry : RequestObject["files"sv]) - { - CbObjectView Ob = Entry.AsObjectView(); - - std::string_view FileName = Ob["file"sv].AsString(); - const IoHash FileHash = Ob["hash"sv].AsHash(); - uint64_t FileSize = Ob["size"sv].AsUInt64(); - - if (IoBuffer Chunk = m_CasStore.FindChunk(FileHash); !Chunk) - { - spdlog::debug("MISSING: {} {} {}", FileHash, FileName, FileSize); - AllOk = false; - - NeedList.push_back(FileHash); - } - else - { - std::filesystem::path FullPath = SandboxDir / FileName; - - const IoBuffer* Chunks[] = {&Chunk}; - - zen::WriteFile(FullPath, Chunks, 1); - } - } - - if (!AllOk) - { - // TODO: Could report all the missing pieces in the response here - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - std::string Executable8{RequestObject["cmd"].AsString()}; - std::string Args8{RequestObject["args"].AsString()}; - - std::wstring Executable = Utf8ToWide(Executable8); - std::wstring Args = Utf8ToWide(Args8); - - spdlog::debug("spawning job in sandbox '{}': '{}' '{}'", SandboxDir, Executable8, Args8); - - std::filesystem::path ExeName = SandboxDir / Executable; - - BasicFunctionJob Job; - Job.SetWorkingDirectory(SandboxDir); - Job.SpawnJob(ExeName, Args); - Job.Wait(); - - CbObjectWriter Response; - - Response << "exitcode" << Job.ExitCode(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); - } - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); } HttpFunctionService::~HttpFunctionService() @@ -704,7 +630,7 @@ HttpFunctionService::HandleRequest(HttpServerRequest& Request) { if (m_Router.HandleRequest(Request) == false) { - m_Log.warn("No route found for {0}", Request.RelativeUri()); + ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } @@ -741,7 +667,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) if (!DataBuffer) { - throw std::exception("Chunk missing" /* ADD CONTEXT */); + throw std::runtime_error("Chunk missing" /* ADD CONTEXT */); } zen::WriteFile(FilePath, DataBuffer); @@ -767,7 +693,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) if (!DataBuffer) { - throw std::exception("Chunk missing" /* ADD CONTEXT */); + throw std::runtime_error("Chunk missing" /* ADD CONTEXT */); } zen::WriteFile(FilePath, DataBuffer); @@ -780,18 +706,16 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) // Manifest inputs in sandbox Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + const IoHash Cid = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); if (!DataBuffer) { - throw std::exception("Chunk missing" /* ADD CONTEXT */); + throw std::runtime_error("Chunk missing" /* ADD CONTEXT */); } - CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer))); - - zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer()); + zen::WriteFile(FilePath, DataBuffer); }); // Set up environment variables @@ -884,7 +808,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) ZEN_ASSERT(OutputData.Data.size() == 1); - CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash); + CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]))); OutputPackage.AddAttachment(Attachment); }); |