From 4584fd6e56fa5c5a7428828e7d3aea4e25a17977 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 11 Nov 2024 09:46:09 +0100 Subject: WIP allow control of size for batch iteration allow adding compositebuffers as attachments directly add batch2 httpstore api to allow batching of CAS & Oid with range requests allow responses with file handles from project store Signed-off-by: Dan Engelbrecht --- src/zencore/include/zencore/compactbinarypackage.h | 5 +- src/zenserver/cache/httpstructuredcache.cpp | 6 +- src/zenserver/projectstore/httpprojectstore.cpp | 170 +++++++++++ src/zenserver/projectstore/httpprojectstore.h | 1 + src/zenserver/projectstore/projectstore.cpp | 319 ++++++++++++++++----- src/zenserver/projectstore/projectstore.h | 8 +- src/zenstore/cas.cpp | 12 +- src/zenstore/cas.h | 3 +- src/zenstore/cidstore.cpp | 10 +- src/zenstore/compactcas.cpp | 6 +- src/zenstore/compactcas.h | 3 +- src/zenstore/include/zenstore/cidstore.h | 3 +- 12 files changed, 453 insertions(+), 93 deletions(-) diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h index fe4a60a30..81c026b15 100644 --- a/src/zencore/include/zencore/compactbinarypackage.h +++ b/src/zencore/include/zencore/compactbinarypackage.h @@ -64,6 +64,9 @@ public: ZENCORE_API CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash); ZENCORE_API CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash); + ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue); + ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); + /** Reset this to a null attachment. */ inline void Reset() { *this = CbAttachment(); } @@ -129,8 +132,6 @@ public: private: ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash); - ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue); - ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); IoHash Hash; std::variant Value; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 551b5a76d..26c56a314 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 0); ResponseWriter << "Count" << AllAttachments.size(); ResponseWriter << "Size" << AttachmentsSize; @@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 0); ResponseWriter << "AttachmentsSize" << AttachmentsSize; } diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 1b45e66f3..d2e138791 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace zen { @@ -256,6 +257,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, [this](HttpRouterRequest& Req) { HandleChunkBatchRequest(Req); }, HttpVerb::kPost); + m_Router.RegisterRoute( + "{project}/oplog/{log}/batch2", + [this](HttpRouterRequest& Req) { HandleChunkBatch2Request(Req); }, + HttpVerb::kPost); + m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, @@ -455,6 +461,170 @@ HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) HttpReq.WriteResponse(HttpResponseCode::OK, ProjectsList); } +void +HttpProjectService::HandleChunkBatch2Request(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkBatch2"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + + std::vector ChunkIds; + (void)FoundLog->IterateChunks( + ChunkIds, + [&](size_t Index, const IoBuffer& Payload) { + try + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[Index]; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)) + { + if (SharedBuffer Decompressed = Compressed.Decompress(RequestedChunk.Offset, RequestedChunk.RequestBytes)) + { + OutBlobs[Index + 1] = Decompressed.AsIoBuffer(); + } + } + } + else + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > Payload.Size()) + { + Offset = Payload.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > Payload.Size()) + { + Size = Payload.Size() - Offset; + } + OutBlobs[Index + 1] = IoBuffer(Payload, Offset, Size); + OutBlobs[Index + 1].MakeOwned(); + } + } + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for id {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkIds[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 0); + + uint8_t* ResponsePtr = reinterpret_cast(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = RequestedChunks[ChunkIndex].CorrelationId; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + m_ProjectStats.ChunkHitCount += RequestHdr.ChunkCount; + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); +} + void HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) { diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 9990ee264..fa9567963 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -63,6 +63,7 @@ private: void HandleProjectListRequest(HttpRouterRequest& Req); void HandleChunkBatchRequest(HttpRouterRequest& Req); + void HandleChunkBatch2Request(HttpRouterRequest& Req); void HandleFilesRequest(HttpRouterRequest& Req); void HandleChunkInfosRequest(HttpRouterRequest& Req); void HandleChunkInfoRequest(HttpRouterRequest& Req); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 5d9d7aa24..c2bddf532 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1799,15 +1799,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) bool ProjectStore::Oplog::IterateChunks(std::span RawHashes, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool); + return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } bool ProjectStore::Oplog::IterateChunks(std::span ChunkIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector CidChunkIndexes; std::vector CidChunkHashes; @@ -1838,7 +1840,8 @@ ProjectStore::Oplog::IterateChunks(std::span ChunkIds, m_CidStore.IterateChunks( CidChunkHashes, [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); }, - OptionalWorkerPool); + OptionalWorkerPool, + LargeSizeLimit); if (OptionalWorkerPool) { @@ -3807,7 +3810,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } return true; }, - &GetSmallWorkerPool(EWorkloadType::Burst)); + &GetSmallWorkerPool(EWorkloadType::Burst), + 0); } CbObjectWriter Response; @@ -3931,7 +3935,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } return true; }, - &WorkerPool); + &WorkerPool, + 0); } CbObjectWriter Response; @@ -4040,38 +4045,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType); } -std::pair -ProjectStore::GetChunkRange(const std::string_view ProjectId, - const std::string_view OplogId, - Oid ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - CompositeBuffer& OutChunk, - ZenContentType& OutContentType) +static std::pair +GetChunkRange(IoBuffer&& Chunk, + const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + CompositeBuffer& OutChunk, + ZenContentType& OutContentType) { bool IsOffset = Offset != 0 || Size != ~(0ull); - Ref Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); - - IoBuffer Chunk = FoundLog->FindChunk(ChunkId); - if (!Chunk) - { - return {HttpResponseCode::NotFound, {}}; - } - OutContentType = Chunk.GetContentType(); if (OutContentType == ZenContentType::kCompressedBinary) @@ -4151,10 +4137,42 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); } - return {HttpResponseCode::OK, {}}; } +std::pair +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + Oid ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + CompositeBuffer& OutChunk, + ZenContentType& OutContentType) +{ + Ref Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + Project->TouchOplog(OplogId); + + IoBuffer Chunk = FoundLog->FindChunk(ChunkId); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + return zen::GetChunkRange(std::move(Chunk), ProjectId, OplogId, ChunkId.ToString(), Offset, Size, AcceptType, OutChunk, OutContentType); +} + std::pair ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, @@ -4512,66 +4530,219 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, { ZEN_TRACE_CPU("Store::Rpc::getchunks"); CbPackage ResponsePackage; + + uint64_t LargeSizeLimit = 0; // Go with default in-memory size + + RpcAcceptOptions AcceptFlags = static_cast(Cb["AcceptFlags"sv].AsUInt16(0u)); + FormatFlags Flags = FormatFlags::kDefault; + int TargetPid = Cb["Pid"sv].AsInt32(0); + void* TargetProcessHandle = nullptr; + + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetPid); + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + else if (TargetProcessHandle != nullptr) + { + // If we allow partial references to blocks, only put very small chunks in memory + LargeSizeLimit = 1024u; + } + } + { - CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); - std::vector ChunkHashes; + CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); + std::vector ChunkHashes; + std::vector ChunkHashIndexes; + std::vector ChunkIds; + std::vector ChunkIdIndexes; + std::vector ChunkRangeOffsets; + std::vector ChunkRangeSizes; + ChunkHashes.reserve(ChunksArray.Num()); + size_t IndexOffset = 0; for (CbFieldView FieldView : ChunksArray) { - ChunkHashes.push_back(FieldView.AsHash()); + if (FieldView.IsHash()) + { + ChunkHashes.push_back(FieldView.AsHash()); + ChunkHashIndexes.push_back(IndexOffset); + } + else if (FieldView.IsObjectId()) + { + ChunkIds.push_back(FieldView.AsObjectId()); + ChunkIdIndexes.push_back(IndexOffset); + } + IndexOffset++; } + CbArrayView RangeOffsetsArray = Cb["offsets"sv].AsArrayView(); + ChunkRangeOffsets.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : RangeOffsetsArray) + { + ChunkRangeOffsets.push_back(FieldView.AsUInt64(0)); + } + ChunkRangeOffsets.resize(ChunksArray.Num(), 0); + CbArrayView RangeSizesArray = Cb["sizes"sv].AsArrayView(); + ChunkRangeSizes.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : RangeSizesArray) + { + ChunkRangeSizes.push_back(FieldView.AsUInt64((uint64_t)-1)); + } + ChunkRangeSizes.resize(ChunksArray.Num(), (uint64_t)-1); - std::vector CompressedBuffers; - CompressedBuffers.resize(ChunksArray.Num()); - (void)m_CidStore.IterateChunks( - ChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - try - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); - if (Compressed && ChunkHashes[Index] == RawHash) + std::vector ChunkBuffers; + if (!ChunkHashes.empty()) + { + ChunkBuffers.resize(ChunkHashes.size()); + (void)m_CidStore.IterateChunks( + ChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + try { - CompressedBuffers[Index] = Compressed.MakeOwned(); + size_t RequestIndex = ChunkHashIndexes[Index]; + ZenContentType ContentType; + CompositeBuffer Chunk; + std::pair GetRangeResponse = + zen::GetChunkRange(IoBuffer(Payload), + ProjectId, + OplogId, + ChunkHashes[Index].ToHexString(), + ChunkRangeOffsets[RequestIndex], + ChunkRangeSizes[RequestIndex], + ZenContentType::kCompressedBinary, + Chunk, + ContentType); + if (GetRangeResponse.first == HttpResponseCode::OK) + { + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ChunkBuffers[Index] = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).MakeOwned(); + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + GetRangeResponse.second); + } + return true; } - else + catch (const std::exception& Ex) { - ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", ProjectId, OplogId, - ChunkHashes[Index]); + ChunkHashes[Index], + Ex.what()); } return true; - } - catch (const std::exception& Ex) + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + LargeSizeLimit); + } + std::vector> ValueBuffers; + if (!ChunkIds.empty()) + { + ValueBuffers.resize(ChunkIds.size()); + Oplog->IterateChunks( + ChunkIds, + [&](size_t Index, const IoBuffer& Payload) { + try + { + size_t RequestIndex = ChunkIdIndexes[Index]; + CompositeBuffer Chunk; + std::pair GetRangeResponse = + zen::GetChunkRange(IoBuffer(Payload), + ProjectId, + OplogId, + ChunkHashes[Index].ToHexString(), + ChunkRangeOffsets[RequestIndex], + ChunkRangeSizes[RequestIndex], + ZenContentType::kCompressedBinary, + Chunk, + ValueBuffers[Index].first); + if (GetRangeResponse.first == HttpResponseCode::OK) + { + ValueBuffers[Index].second = std::move(Chunk).MakeOwned(); + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + GetRangeResponse.second); + } + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + LargeSizeLimit); + } + + CbObjectWriter ResponseWriter; + if (!ChunkHashes.empty()) + { + ResponseWriter.BeginArray("chunks"sv); + for (size_t Index = 0; Index < ChunkHashes.size(); Index++) + { + CompressedBuffer& Compressed = ChunkBuffers[Index]; + if (Compressed) { - ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", - ProjectId, - OplogId, - ChunkHashes[Index], - Ex.what()); + const IoHash& RawHash = ChunkHashes[Index]; + ResponseWriter.AddHash(RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); } - return true; - }, - &GetSmallWorkerPool(EWorkloadType::Burst)); + } + ResponseWriter.EndArray(); + } - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("chunks"sv); - for (size_t Index = 0; Index < ChunkHashes.size(); Index++) + if (!ChunkIds.empty()) { - CompressedBuffer& Compressed = CompressedBuffers[Index]; - if (Compressed) + ResponseWriter.BeginArray("values"sv); + for (size_t Index = 0; Index < ChunkIds.size(); Index++) { - const IoHash& RawHash = ChunkHashes[Index]; - ResponseWriter.AddHash(RawHash); - ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + CompositeBuffer& Composite = ValueBuffers[Index].second; + if (Composite) + { + ResponseWriter.BeginObject(); + const Oid& Id = ChunkIds[Index]; + ResponseWriter.AddObjectId("id", Id); + if (ValueBuffers[Index].first == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t _; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Composite), RawHash, _); + ResponseWriter.AddHash("rawhash", RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + } + else + { + IoHash RawHash = IoHash::HashBuffer(Composite); + ResponseWriter.AddHash("rawhash", RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Composite), RawHash)); + } + ResponseWriter.EndObject(); + } } + ResponseWriter.EndArray(); } - ResponseWriter.EndArray(); ResponsePackage.SetObject(ResponseWriter.Save()); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); return true; } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 49970b677..52debe271 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -7,6 +7,7 @@ #include #include #include +#include ZEN_THIRD_PARTY_INCLUDES_START #include @@ -118,10 +119,12 @@ public: IoBuffer GetChunkByRawHash(const IoHash& RawHash); bool IterateChunks(std::span RawHashes, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool IterateChunks(std::span ChunkIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry @@ -469,6 +472,7 @@ private: mutable RwLock m_UpdateCaptureLock; uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr> m_CapturedProjects; + OpenProcessCache m_OpenProcessCache; std::filesystem::path BasePathForProject(std::string_view ProjectId); diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index bff221fc7..4252fc859 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -59,7 +59,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) override; virtual bool IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) override; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) override; virtual void Flush() override; virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; @@ -392,7 +393,8 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks) bool CasImpl::IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { ZEN_TRACE_CPU("CAS::IterateChunks"); if (!m_SmallStrategy.IterateChunks( @@ -402,7 +404,8 @@ CasImpl::IterateChunks(std::span DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } @@ -413,7 +416,8 @@ CasImpl::IterateChunks(std::span DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index bedbc6a9a..e279dd2cc 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -47,7 +47,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) = 0; virtual bool IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) = 0; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 71fd596f4..2ab769d04 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -119,9 +119,10 @@ struct CidStore::Impl bool IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void Flush() { m_CasStore.Flush(); } @@ -217,9 +218,10 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) bool CidStore::IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 7f1300177..f479a7173 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -305,7 +305,8 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) bool CasContainerStrategy::IterateChunks(std::span ChunkHashes, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector FoundChunkIndexes; std::vector FoundChunkLocations; @@ -332,7 +333,8 @@ CasContainerStrategy::IterateChunks(std::span ChunkHashes, }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }); + }, + LargeSizeLimit); }; Latch WorkLatch(1); diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 44567e7a0..07e620086 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -58,7 +58,8 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void FilterChunks(HashKeySet& InOutChunks); bool IterateChunks(std::span ChunkHashes, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); void Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d95fa7cd4..b3d00fec0 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -82,7 +82,8 @@ public: virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); -- cgit v1.2.3