aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-24 21:10:10 +0200
committerStefan Boberg <[email protected]>2021-08-24 21:10:10 +0200
commitb8958881f9bc20010a20953cabb3dda84f2960f8 (patch)
treec057c8c32e7af99b47cebf004310da2865f5869c /zenserver/compute/apply.cpp
parentFixed up drop logic (short circuiting fail!) (diff)
downloadzen-b8958881f9bc20010a20953cabb3dda84f2960f8.tar.xz
zen-b8958881f9bc20010a20953cabb3dda84f2960f8.zip
Implemented function propagation
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp57
1 files changed, 56 insertions, 1 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 8a2750516..c07837e05 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -12,6 +12,8 @@
#include <zencore/windows.h>
#include <zenstore/CAS.h>
+#include <zencore/prewindows.h>
+
#include <AccCtrl.h>
#include <AclAPI.h>
#include <sddl.h>
@@ -20,6 +22,9 @@
#pragma comment(lib, "UserEnv.lib")
#include <atlbase.h>
+
+#include <zencore/postwindows.h>
+
#include <filesystem>
#include <span>
@@ -363,7 +368,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
CasChunkSet ChunkSet;
- FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); });
+ FunctionSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddChunk(Hash);
+ });
m_CasStore.FilterChunks(ChunkSet);
@@ -373,6 +381,8 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
+ spdlog::debug("worker {}: all attachments already available", WorkerId);
+
return HttpReq.WriteResponse(HttpResponse::NoContent);
}
else
@@ -382,11 +392,15 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
for (const IoHash& Hash : ChunkSet.GetChunkSet())
{
+ spdlog::debug("worker {}: need chunk {}", WorkerId, Hash);
+
ResponseWriter.AddHash(Hash);
}
ResponseWriter.EndArray();
+ spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size());
+
return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save());
}
}
@@ -395,6 +409,47 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem:
case HttpContentType::kCbPackage:
{
CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
+
+ CbObject Obj = FunctionSpec.GetObject();
+
+ std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ SharedBuffer DataView = Attachment.AsBinaryView();
+
+ TotalAttachmentBytes += DataView.GetSize();
+ ++AttachmentCount;
+
+ CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += DataView.GetSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ spdlog::debug("worker {}: {}B in {} attachments. {}B new ({} attachments)",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
+
+ return HttpReq.WriteResponse(HttpResponse::NoContent);
}
break;
}