diff options
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 72 |
1 files changed, 40 insertions, 32 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 939ac3362..94dedf087 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -351,12 +351,12 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { - return HttpReq.WriteResponse(HttpResponse::NotFound); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { const WorkerDesc& Desc = It->second; - return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor); + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); } } break; @@ -378,6 +378,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, ChunkSet.AddChunk(Hash); }); + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them, hence + // the CAS lookup here instead of CID for the input payloads + m_CasStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) @@ -388,7 +392,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, spdlog::debug("worker {}: all attachments already available", WorkerId); - return HttpReq.WriteResponse(HttpResponse::NoContent); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); } else { @@ -406,7 +410,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size()); - return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save()); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } } break; @@ -426,21 +430,25 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, for (const CbAttachment& Attachment : Attachments) { - ZEN_ASSERT(Attachment.IsBinary()); + ZEN_ASSERT(Attachment.IsCompressedBinary()); - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + SharedBuffer Decompressed = DataView.Decompress(); + const uint64_t DecompressedSize = DataView.GetRawSize(); - TotalAttachmentBytes += DataView.GetCompressedSize(); + TotalAttachmentBytes += DecompressedSize; ++AttachmentCount; - IoBuffer Payload = DataView.GetCompressed().Flatten().AsIoBuffer(); + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them - CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Payload, DataHash); + const CasStore::InsertResult InsertResult = + m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); if (InsertResult.New) { - TotalNewBytes += Payload.Size(); + TotalNewBytes += DecompressedSize; ++NewAttachmentCount; } } @@ -456,7 +464,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); - return HttpReq.WriteResponse(HttpResponse::NoContent); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; } @@ -495,7 +503,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { - return HttpReq.WriteResponse(HttpResponse::NotFound); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { @@ -526,7 +534,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, RequestObject.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } @@ -538,7 +546,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, CbPackage Output = ExecAction(Worker, RequestObject); - return HttpReq.WriteResponse(HttpResponse::OK, Output); + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } CbObjectWriter Cbo; @@ -552,7 +560,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, Cbo.EndArray(); CbObject Response = Cbo.Save(); - return HttpReq.WriteResponse(HttpResponse::NotFound, Response); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } break; @@ -570,19 +578,21 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, for (const CbAttachment& Attachment : Attachments) { - ZEN_ASSERT(Attachment.IsBinary()); + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); - const IoHash DataHash = Attachment.GetHash(); - SharedBuffer DataView = Attachment.AsBinary(); + const uint64_t CompressedSize = DataView.GetCompressedSize(); - TotalAttachmentBytes += DataView.GetSize(); + TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash); + const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { - TotalNewBytes += DataView.GetSize(); + TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } @@ -595,7 +605,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, CbPackage Output = ExecAction(Worker, ActionObj); - return HttpReq.WriteResponse(HttpResponse::OK, Output); + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; } @@ -659,7 +669,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, if (!AllOk) { // TODO: Could report all the missing pieces in the response here - return HttpReq.WriteResponse(HttpResponse::NotFound); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } std::string Executable8{RequestObject["cmd"].AsString()}; @@ -681,7 +691,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, Response << "exitcode" << Job.ExitCode(); - return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); + return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); } break; } @@ -780,18 +790,16 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) // Manifest inputs in sandbox Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + const IoHash Cid = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); if (!DataBuffer) { throw std::exception("Chunk missing" /* ADD CONTEXT */); } - CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer))); - - zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer()); + zen::WriteFile(FilePath, DataBuffer); }); // Set up environment variables @@ -884,7 +892,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) ZEN_ASSERT(OutputData.Data.size() == 1); - CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash); + CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]))); OutputPackage.AddAttachment(Attachment); }); |