diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-11 09:46:09 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2024-11-11 09:46:09 +0100 |
| commit | 4584fd6e56fa5c5a7428828e7d3aea4e25a17977 (patch) | |
| tree | c7f0ae3ea387585fb167fb9f5dfc3ecad8918e34 /src | |
| parent | use IterateChunks for "getchunks" projectstore rpc request (diff) | |
| download | zen-de/improved-projectstore-batch-requests.tar.xz zen-de/improved-projectstore-batch-requests.zip | |
allow control of size for batch iteration
allow adding compositebuffers as attachments directly
add batch2 httpstore api to allow batching of CAS & Oid with range requests
allow responses with file handles from project store
Signed-off-by: Dan Engelbrecht <[email protected]>
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/compactbinarypackage.h | 5 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 170 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.h | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 319 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 8 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 10 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 3 |
12 files changed, 453 insertions, 93 deletions
diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h index fe4a60a30..81c026b15 100644 --- a/src/zencore/include/zencore/compactbinarypackage.h +++ b/src/zencore/include/zencore/compactbinarypackage.h @@ -64,6 +64,9 @@ public: ZENCORE_API CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash); ZENCORE_API CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash); + ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue); + ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); + /** Reset this to a null attachment. */ inline void Reset() { *this = CbAttachment(); } @@ -129,8 +132,6 @@ public: private: ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash); - ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue); - ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); IoHash Hash; std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 551b5a76d..26c56a314 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 0); ResponseWriter << "Count" << AllAttachments.size(); ResponseWriter << "Size" << AttachmentsSize; @@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 0); ResponseWriter << "AttachmentsSize" << AttachmentsSize; } diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 1b45e66f3..d2e138791 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenstore/zenstore.h> +#include <zenutil/workerpools.h> namespace zen { @@ -257,6 +258,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpVerb::kPost); m_Router.RegisterRoute( + "{project}/oplog/{log}/batch2", + [this](HttpRouterRequest& Req) { HandleChunkBatch2Request(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, HttpVerb::kGet); @@ -456,6 +462,170 @@ HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) } void +HttpProjectService::HandleChunkBatch2Request(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkBatch2"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector<IoBuffer> OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + + std::vector<Oid> ChunkIds; + (void)FoundLog->IterateChunks( + ChunkIds, + [&](size_t Index, const IoBuffer& Payload) { + try + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[Index]; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)) + { + if (SharedBuffer Decompressed = Compressed.Decompress(RequestedChunk.Offset, RequestedChunk.RequestBytes)) + { + OutBlobs[Index + 1] = Decompressed.AsIoBuffer(); + } + } + } + else + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > Payload.Size()) + { + Offset = Payload.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > Payload.Size()) + { + Size = Payload.Size() - Offset; + } + OutBlobs[Index + 1] = IoBuffer(Payload, Offset, Size); + OutBlobs[Index + 1].MakeOwned(); + } + } + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for id {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkIds[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 0); + + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = RequestedChunks[ChunkIndex].CorrelationId; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + m_ProjectStats.ChunkHitCount += RequestHdr.ChunkCount; + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); +} + +void HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkBatch"); diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 9990ee264..fa9567963 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -63,6 +63,7 @@ private: void HandleProjectListRequest(HttpRouterRequest& Req); void HandleChunkBatchRequest(HttpRouterRequest& Req); + void HandleChunkBatch2Request(HttpRouterRequest& Req); void HandleFilesRequest(HttpRouterRequest& Req); void HandleChunkInfosRequest(HttpRouterRequest& Req); void HandleChunkInfoRequest(HttpRouterRequest& Req); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 5d9d7aa24..c2bddf532 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1799,15 +1799,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) bool ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool); + return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } bool ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector<size_t> CidChunkIndexes; std::vector<IoHash> CidChunkHashes; @@ -1838,7 +1840,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, m_CidStore.IterateChunks( CidChunkHashes, [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); }, - OptionalWorkerPool); + OptionalWorkerPool, + LargeSizeLimit); if (OptionalWorkerPool) { @@ -3807,7 +3810,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } return true; }, - &GetSmallWorkerPool(EWorkloadType::Burst)); + &GetSmallWorkerPool(EWorkloadType::Burst), + 0); } CbObjectWriter Response; @@ -3931,7 +3935,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } return true; }, - &WorkerPool); + &WorkerPool, + 0); } CbObjectWriter Response; @@ -4040,38 +4045,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType); } -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunkRange(const std::string_view ProjectId, - const std::string_view OplogId, - Oid ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - CompositeBuffer& OutChunk, - ZenContentType& OutContentType) +static std::pair<HttpResponseCode, std::string> +GetChunkRange(IoBuffer&& Chunk, + const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + CompositeBuffer& OutChunk, + ZenContentType& OutContentType) { bool IsOffset = Offset != 0 || Size != ~(0ull); - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); - - IoBuffer Chunk = FoundLog->FindChunk(ChunkId); - if (!Chunk) - { - return {HttpResponseCode::NotFound, {}}; - } - OutContentType = Chunk.GetContentType(); if (OutContentType == ZenContentType::kCompressedBinary) @@ -4151,11 +4137,43 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); } - return {HttpResponseCode::OK, {}}; } std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + Oid ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + CompositeBuffer& OutChunk, + ZenContentType& OutContentType) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + Project->TouchOplog(OplogId); + + IoBuffer Chunk = FoundLog->FindChunk(ChunkId); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + return zen::GetChunkRange(std::move(Chunk), ProjectId, OplogId, ChunkId.ToString(), Offset, Size, AcceptType, OutChunk, OutContentType); +} + +std::pair<HttpResponseCode, std::string> ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, @@ -4512,66 +4530,219 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, { ZEN_TRACE_CPU("Store::Rpc::getchunks"); CbPackage ResponsePackage; + + uint64_t LargeSizeLimit = 0; // Go with default in-memory size + + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + FormatFlags Flags = FormatFlags::kDefault; + int TargetPid = Cb["Pid"sv].AsInt32(0); + void* TargetProcessHandle = nullptr; + + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetPid); + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + else if (TargetProcessHandle != nullptr) + { + // If we allow partial references to blocks, only put very small chunks in memory + LargeSizeLimit = 1024u; + } + } + { - CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); - std::vector<IoHash> ChunkHashes; + CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); + std::vector<IoHash> ChunkHashes; + std::vector<size_t> ChunkHashIndexes; + std::vector<Oid> ChunkIds; + std::vector<size_t> ChunkIdIndexes; + std::vector<uint64_t> ChunkRangeOffsets; + std::vector<uint64_t> ChunkRangeSizes; + ChunkHashes.reserve(ChunksArray.Num()); + size_t IndexOffset = 0; for (CbFieldView FieldView : ChunksArray) { - ChunkHashes.push_back(FieldView.AsHash()); + if (FieldView.IsHash()) + { + ChunkHashes.push_back(FieldView.AsHash()); + ChunkHashIndexes.push_back(IndexOffset); + } + else if (FieldView.IsObjectId()) + { + ChunkIds.push_back(FieldView.AsObjectId()); + ChunkIdIndexes.push_back(IndexOffset); + } + IndexOffset++; } + CbArrayView RangeOffsetsArray = Cb["offsets"sv].AsArrayView(); + ChunkRangeOffsets.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : RangeOffsetsArray) + { + ChunkRangeOffsets.push_back(FieldView.AsUInt64(0)); + } + ChunkRangeOffsets.resize(ChunksArray.Num(), 0); + CbArrayView RangeSizesArray = Cb["sizes"sv].AsArrayView(); + ChunkRangeSizes.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : RangeSizesArray) + { + ChunkRangeSizes.push_back(FieldView.AsUInt64((uint64_t)-1)); + } + ChunkRangeSizes.resize(ChunksArray.Num(), (uint64_t)-1); - std::vector<CompressedBuffer> CompressedBuffers; - CompressedBuffers.resize(ChunksArray.Num()); - (void)m_CidStore.IterateChunks( - ChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - try - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); - if (Compressed && ChunkHashes[Index] == RawHash) + std::vector<CompressedBuffer> ChunkBuffers; + if (!ChunkHashes.empty()) + { + ChunkBuffers.resize(ChunkHashes.size()); + (void)m_CidStore.IterateChunks( + ChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + try { - CompressedBuffers[Index] = Compressed.MakeOwned(); + size_t RequestIndex = ChunkHashIndexes[Index]; + ZenContentType ContentType; + CompositeBuffer Chunk; + std::pair<HttpResponseCode, std::string> GetRangeResponse = + zen::GetChunkRange(IoBuffer(Payload), + ProjectId, + OplogId, + ChunkHashes[Index].ToHexString(), + ChunkRangeOffsets[RequestIndex], + ChunkRangeSizes[RequestIndex], + ZenContentType::kCompressedBinary, + Chunk, + ContentType); + if (GetRangeResponse.first == HttpResponseCode::OK) + { + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ChunkBuffers[Index] = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).MakeOwned(); + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + GetRangeResponse.second); + } + return true; } - else + catch (const std::exception& Ex) { - ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", ProjectId, OplogId, - ChunkHashes[Index]); + ChunkHashes[Index], + Ex.what()); } return true; - } - catch (const std::exception& Ex) + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + LargeSizeLimit); + } + std::vector<std::pair<ZenContentType, CompositeBuffer>> ValueBuffers; + if (!ChunkIds.empty()) + { + ValueBuffers.resize(ChunkIds.size()); + Oplog->IterateChunks( + ChunkIds, + [&](size_t Index, const IoBuffer& Payload) { + try + { + size_t RequestIndex = ChunkIdIndexes[Index]; + CompositeBuffer Chunk; + std::pair<HttpResponseCode, std::string> GetRangeResponse = + zen::GetChunkRange(IoBuffer(Payload), + ProjectId, + OplogId, + ChunkHashes[Index].ToHexString(), + ChunkRangeOffsets[RequestIndex], + ChunkRangeSizes[RequestIndex], + ZenContentType::kCompressedBinary, + Chunk, + ValueBuffers[Index].first); + if (GetRangeResponse.first == HttpResponseCode::OK) + { + ValueBuffers[Index].second = std::move(Chunk).MakeOwned(); + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + GetRangeResponse.second); + } + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkHashes[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + LargeSizeLimit); + } + + CbObjectWriter ResponseWriter; + if (!ChunkHashes.empty()) + { + ResponseWriter.BeginArray("chunks"sv); + for (size_t Index = 0; Index < ChunkHashes.size(); Index++) + { + CompressedBuffer& Compressed = ChunkBuffers[Index]; + if (Compressed) { - ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'", - ProjectId, - OplogId, - ChunkHashes[Index], - Ex.what()); + const IoHash& RawHash = ChunkHashes[Index]; + ResponseWriter.AddHash(RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); } - return true; - }, - &GetSmallWorkerPool(EWorkloadType::Burst)); + } + ResponseWriter.EndArray(); + } - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("chunks"sv); - for (size_t Index = 0; Index < ChunkHashes.size(); Index++) + if (!ChunkIds.empty()) { - CompressedBuffer& Compressed = CompressedBuffers[Index]; - if (Compressed) + ResponseWriter.BeginArray("values"sv); + for (size_t Index = 0; Index < ChunkIds.size(); Index++) { - const IoHash& RawHash = ChunkHashes[Index]; - ResponseWriter.AddHash(RawHash); - ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + CompositeBuffer& Composite = ValueBuffers[Index].second; + if (Composite) + { + ResponseWriter.BeginObject(); + const Oid& Id = ChunkIds[Index]; + ResponseWriter.AddObjectId("id", Id); + if (ValueBuffers[Index].first == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t _; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Composite), RawHash, _); + ResponseWriter.AddHash("rawhash", RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + } + else + { + IoHash RawHash = IoHash::HashBuffer(Composite); + ResponseWriter.AddHash("rawhash", RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(Composite), RawHash)); + } + ResponseWriter.EndObject(); + } } + ResponseWriter.EndArray(); } - ResponseWriter.EndArray(); ResponsePackage.SetObject(ResponseWriter.Save()); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); return true; } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 49970b677..52debe271 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -7,6 +7,7 @@ #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> #include <zenstore/gc.h> +#include <zenutil/openprocesscache.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -118,10 +119,12 @@ public: IoBuffer GetChunkByRawHash(const IoHash& RawHash); bool IterateChunks(std::span<IoHash> RawHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool IterateChunks(std::span<Oid> ChunkIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry @@ -469,6 +472,7 @@ private: mutable RwLock m_UpdateCaptureLock; uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr<std::vector<std::string>> m_CapturedProjects; + OpenProcessCache m_OpenProcessCache; std::filesystem::path BasePathForProject(std::string_view ProjectId); diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index bff221fc7..4252fc859 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -59,7 +59,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) override; virtual bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) override; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) override; virtual void Flush() override; virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; @@ -392,7 +393,8 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks) bool CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { ZEN_TRACE_CPU("CAS::IterateChunks"); if (!m_SmallStrategy.IterateChunks( @@ -402,7 +404,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } @@ -413,7 +416,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index bedbc6a9a..e279dd2cc 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -47,7 +47,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) = 0; virtual bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) = 0; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 71fd596f4..2ab769d04 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -119,9 +119,10 @@ struct CidStore::Impl bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void Flush() { m_CasStore.Flush(); } @@ -217,9 +218,10 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) bool CidStore::IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 7f1300177..f479a7173 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -305,7 +305,8 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) bool CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector<size_t> FoundChunkIndexes; std::vector<BlockStoreLocation> FoundChunkLocations; @@ -332,7 +333,8 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }); + }, + LargeSizeLimit); }; Latch WorkLatch(1); diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 44567e7a0..07e620086 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -58,7 +58,8 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void FilterChunks(HashKeySet& InOutChunks); bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); void Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d95fa7cd4..b3d00fec0 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -82,7 +82,8 @@ public: virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); |