aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-15 09:22:32 +0200
committerMartin Ridgers <[email protected]>2021-09-15 09:23:33 +0200
commit8f5e773529858223beeecf5d1b69c23991df644e (patch)
tree2c360c67e028f5ecd7368212b0adf8b23578ff9d /zenserver/compute/apply.cpp
parentUse zen::Sleep() in timer.cpp's tests (diff)
parentUpdated function service to new package management API (diff)
downloadzen-8f5e773529858223beeecf5d1b69c23991df644e.tar.xz
zen-8f5e773529858223beeecf5d1b69c23991df644e.zip
Merge main
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp72
1 files changed, 40 insertions, 32 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 939ac3362..94dedf087 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -351,12 +351,12 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
{
- return HttpReq.WriteResponse(HttpResponse::NotFound);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
else
{
const WorkerDesc& Desc = It->second;
- return HttpReq.WriteResponse(HttpResponse::OK, Desc.Descriptor);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
}
}
break;
@@ -378,6 +378,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
ChunkSet.AddChunk(Hash);
});
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them, hence
+ // the CAS lookup here instead of CID for the input payloads
+
m_CasStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
@@ -388,7 +392,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
spdlog::debug("worker {}: all attachments already available", WorkerId);
- return HttpReq.WriteResponse(HttpResponse::NoContent);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
}
else
{
@@ -406,7 +410,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size());
- return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save());
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
}
}
break;
@@ -426,21 +430,25 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
for (const CbAttachment& Attachment : Attachments)
{
- ZEN_ASSERT(Attachment.IsBinary());
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ SharedBuffer Decompressed = DataView.Decompress();
+ const uint64_t DecompressedSize = DataView.GetRawSize();
- TotalAttachmentBytes += DataView.GetCompressedSize();
+ TotalAttachmentBytes += DecompressedSize;
++AttachmentCount;
- IoBuffer Payload = DataView.GetCompressed().Flatten().AsIoBuffer();
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them
- CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Payload, DataHash);
+ const CasStore::InsertResult InsertResult =
+ m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
if (InsertResult.New)
{
- TotalNewBytes += Payload.Size();
+ TotalNewBytes += DecompressedSize;
++NewAttachmentCount;
}
}
@@ -456,7 +464,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
- return HttpReq.WriteResponse(HttpResponse::NoContent);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
}
break;
}
@@ -495,7 +503,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
{
- return HttpReq.WriteResponse(HttpResponse::NotFound);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
else
{
@@ -526,7 +534,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
RequestObject.IterateAttachments([&](CbFieldView Field) {
const IoHash FileHash = Field.AsHash();
- if (!m_CasStore.FindChunk(FileHash))
+ if (!m_CidStore.ContainsChunk(FileHash))
{
NeedList.push_back(FileHash);
}
@@ -538,7 +546,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
CbPackage Output = ExecAction(Worker, RequestObject);
- return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
CbObjectWriter Cbo;
@@ -552,7 +560,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
Cbo.EndArray();
CbObject Response = Cbo.Save();
- return HttpReq.WriteResponse(HttpResponse::NotFound, Response);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
}
break;
@@ -570,19 +578,21 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
for (const CbAttachment& Attachment : Attachments)
{
- ZEN_ASSERT(Attachment.IsBinary());
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
- const IoHash DataHash = Attachment.GetHash();
- SharedBuffer DataView = Attachment.AsBinary();
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
- TotalAttachmentBytes += DataView.GetSize();
+ TotalAttachmentBytes += CompressedSize;
++AttachmentCount;
- CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash);
+ const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
if (InsertResult.New)
{
- TotalNewBytes += DataView.GetSize();
+ TotalNewBytes += CompressedSize;
++NewAttachmentCount;
}
}
@@ -595,7 +605,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
CbPackage Output = ExecAction(Worker, ActionObj);
- return HttpReq.WriteResponse(HttpResponse::OK, Output);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
break;
}
@@ -659,7 +669,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
if (!AllOk)
{
// TODO: Could report all the missing pieces in the response here
- return HttpReq.WriteResponse(HttpResponse::NotFound);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
std::string Executable8{RequestObject["cmd"].AsString()};
@@ -681,7 +691,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
Response << "exitcode" << Job.ExitCode();
- return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
}
break;
}
@@ -780,18 +790,16 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
// Manifest inputs in sandbox
Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- std::filesystem::path FilePath{SandboxPath / "Inputs" / Hash.ToHexString()};
- IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
if (!DataBuffer)
{
throw std::exception("Chunk missing" /* ADD CONTEXT */);
}
- CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(DataBuffer)));
-
- zen::WriteFile(FilePath, Buffer.GetCompressed().Flatten().AsIoBuffer());
+ zen::WriteFile(FilePath, DataBuffer);
});
// Set up environment variables
@@ -884,7 +892,7 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
ZEN_ASSERT(OutputData.Data.size() == 1);
- CbAttachment Attachment(SharedBuffer(ChunkData.Data[0]), Hash);
+ CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])));
OutputPackage.AddAttachment(Attachment);
});