diff options
| author | Stefan Boberg <[email protected]> | 2021-08-24 21:10:10 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-24 21:10:10 +0200 |
| commit | b8958881f9bc20010a20953cabb3dda84f2960f8 (patch) | |
| tree | c057c8c32e7af99b47cebf004310da2865f5869c /zenserver/compute/apply.cpp | |
| parent | Fixed up drop logic (short circuiting fail!) (diff) | |
| download | zen-b8958881f9bc20010a20953cabb3dda84f2960f8.tar.xz zen-b8958881f9bc20010a20953cabb3dda84f2960f8.zip | |
Implemented function propagation
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 57 |
1 files changed, 56 insertions, 1 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 8a2750516..c07837e05 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -12,6 +12,8 @@ #include <zencore/windows.h> #include <zenstore/CAS.h> +#include <zencore/prewindows.h> + #include <AccCtrl.h> #include <AclAPI.h> #include <sddl.h> @@ -20,6 +22,9 @@ #pragma comment(lib, "UserEnv.lib") #include <atlbase.h> + +#include <zencore/postwindows.h> + #include <filesystem> #include <span> @@ -363,7 +368,10 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: CasChunkSet ChunkSet; - FunctionSpec.IterateAttachments([&](CbFieldView Field) { ChunkSet.AddChunk(Field.GetHash()); }); + FunctionSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddChunk(Hash); + }); m_CasStore.FilterChunks(ChunkSet); @@ -373,6 +381,8 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); + spdlog::debug("worker {}: all attachments already available", WorkerId); + return HttpReq.WriteResponse(HttpResponse::NoContent); } else @@ -382,11 +392,15 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: for (const IoHash& Hash : ChunkSet.GetChunkSet()) { + spdlog::debug("worker {}: need chunk {}", WorkerId, Hash); + ResponseWriter.AddHash(Hash); } ResponseWriter.EndArray(); + spdlog::debug("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size()); + return HttpReq.WriteResponse(HttpResponse::NotFound, ResponseWriter.Save()); } } @@ -395,6 +409,47 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, const std::filesystem: case HttpContentType::kCbPackage: { CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); + + CbObject Obj = FunctionSpec.GetObject(); + + std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsBinary()); + + const IoHash DataHash = Attachment.GetHash(); + SharedBuffer DataView = Attachment.AsBinaryView(); + + TotalAttachmentBytes += DataView.GetSize(); + ++AttachmentCount; + + CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(DataView.AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += DataView.GetSize(); + ++NewAttachmentCount; + } + } + + spdlog::debug("worker {}: {}B in {} attachments. {}B new ({} attachments)", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); + + return HttpReq.WriteResponse(HttpResponse::NoContent); } break; } |