aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
committerDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
commit4584fd6e56fa5c5a7428828e7d3aea4e25a17977 (patch)
treec7f0ae3ea387585fb167fb9f5dfc3ecad8918e34
parentuse IterateChunks for "getchunks" projectstore rpc request (diff)
downloadzen-de/improved-projectstore-batch-requests.tar.xz
zen-de/improved-projectstore-batch-requests.zip
allow control of size for batch iteration allow adding compositebuffers as attachments directly add batch2 httpstore api to allow batching of CAS & Oid with range requests allow responses with file handles from project store Signed-off-by: Dan Engelbrecht <[email protected]>
-rw-r--r--src/zencore/include/zencore/compactbinarypackage.h5
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp6
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp170
-rw-r--r--src/zenserver/projectstore/httpprojectstore.h1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp319
-rw-r--r--src/zenserver/projectstore/projectstore.h8
-rw-r--r--src/zenstore/cas.cpp12
-rw-r--r--src/zenstore/cas.h3
-rw-r--r--src/zenstore/cidstore.cpp10
-rw-r--r--src/zenstore/compactcas.cpp6
-rw-r--r--src/zenstore/compactcas.h3
-rw-r--r--src/zenstore/include/zenstore/cidstore.h3
12 files changed, 453 insertions, 93 deletions
diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h
index fe4a60a30..81c026b15 100644
--- a/src/zencore/include/zencore/compactbinarypackage.h
+++ b/src/zencore/include/zencore/compactbinarypackage.h
@@ -64,6 +64,9 @@ public:
ZENCORE_API CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash);
ZENCORE_API CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash);
+ ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue);
+ ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash);
+
/** Reset this to a null attachment. */
inline void Reset() { *this = CbAttachment(); }
@@ -129,8 +132,6 @@ public:
private:
ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash);
- ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue);
- ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash);
IoHash Hash;
std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 551b5a76d..26c56a314 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
AttachmentsSize += Payload.GetSize();
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 0);
ResponseWriter << "Count" << AllAttachments.size();
ResponseWriter << "Size" << AttachmentsSize;
@@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
AttachmentsSize += Payload.GetSize();
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 0);
ResponseWriter << "AttachmentsSize" << AttachmentsSize;
}
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 1b45e66f3..d2e138791 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/stream.h>
#include <zencore/trace.h>
#include <zenstore/zenstore.h>
+#include <zenutil/workerpools.h>
namespace zen {
@@ -257,6 +258,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
HttpVerb::kPost);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/batch2",
+ [this](HttpRouterRequest& Req) { HandleChunkBatch2Request(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"{project}/oplog/{log}/files",
[this](HttpRouterRequest& Req) { HandleFilesRequest(Req); },
HttpVerb::kGet);
@@ -456,6 +462,170 @@ HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req)
}
void
+HttpProjectService::HandleChunkBatch2Request(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkBatch2");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ // Parse Request
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ BinaryReader Reader(Payload);
+
+ struct RequestHeader
+ {
+ enum
+ {
+ kMagic = 0xAAAA'77AC
+ };
+ uint32_t Magic;
+ uint32_t ChunkCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+ };
+
+ struct RequestChunkEntry
+ {
+ Oid ChunkId;
+ uint32_t CorrelationId;
+ uint64_t Offset;
+ uint64_t RequestBytes;
+ };
+
+ if (Payload.Size() <= sizeof(RequestHeader))
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ RequestHeader RequestHdr;
+ Reader.Read(&RequestHdr, sizeof RequestHdr);
+
+ if (RequestHdr.Magic != RequestHeader::kMagic)
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<RequestChunkEntry> RequestedChunks;
+ RequestedChunks.resize(RequestHdr.ChunkCount);
+ Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
+
+ // Make Response
+
+ struct ResponseHeader
+ {
+ uint32_t Magic = 0xbada'b00f;
+ uint32_t ChunkCount;
+ uint32_t Reserved1 = 0;
+ uint32_t Reserved2 = 0;
+ };
+
+ struct ResponseChunkEntry
+ {
+ uint32_t CorrelationId;
+ uint32_t Flags = 0;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<IoBuffer> OutBlobs;
+ OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
+
+ std::vector<Oid> ChunkIds;
+ (void)FoundLog->IterateChunks(
+ ChunkIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[Index];
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize))
+ {
+ if (SharedBuffer Decompressed = Compressed.Decompress(RequestedChunk.Offset, RequestedChunk.RequestBytes))
+ {
+ OutBlobs[Index + 1] = Decompressed.AsIoBuffer();
+ }
+ }
+ }
+ else
+ {
+ if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ {
+ uint64_t Offset = RequestedChunk.Offset;
+ if (Offset > Payload.Size())
+ {
+ Offset = Payload.Size();
+ }
+ uint64_t Size = RequestedChunk.RequestBytes;
+ if ((Offset + Size) > Payload.Size())
+ {
+ Size = Payload.Size() - Offset;
+ }
+ OutBlobs[Index + 1] = IoBuffer(Payload, Offset, Size);
+ OutBlobs[Index + 1].MakeOwned();
+ }
+ }
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for id {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkIds[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 0);
+
+ uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
+ ResponseHeader ResponseHdr;
+ ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
+ memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
+ ResponsePtr += sizeof(ResponseHdr);
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
+ ResponseChunkEntry ResponseChunk;
+ ResponseChunk.CorrelationId = RequestedChunks[ChunkIndex].CorrelationId;
+ if (FoundChunk)
+ {
+ ResponseChunk.ChunkSize = FoundChunk.Size();
+ }
+ else
+ {
+ ResponseChunk.ChunkSize = uint64_t(-1);
+ }
+ memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
+ ResponsePtr += sizeof(ResponseChunk);
+ }
+ m_ProjectStats.ChunkHitCount += RequestHdr.ChunkCount;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
+}
+
+void
HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
{
ZEN_TRACE_CPU("ProjectService::ChunkBatch");
diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h
index 9990ee264..fa9567963 100644
--- a/src/zenserver/projectstore/httpprojectstore.h
+++ b/src/zenserver/projectstore/httpprojectstore.h
@@ -63,6 +63,7 @@ private:
void HandleProjectListRequest(HttpRouterRequest& Req);
void HandleChunkBatchRequest(HttpRouterRequest& Req);
+ void HandleChunkBatch2Request(HttpRouterRequest& Req);
void HandleFilesRequest(HttpRouterRequest& Req);
void HandleChunkInfosRequest(HttpRouterRequest& Req);
void HandleChunkInfoRequest(HttpRouterRequest& Req);
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 5d9d7aa24..c2bddf532 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1799,15 +1799,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
bool
ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool);
+ return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
bool
ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
std::vector<size_t> CidChunkIndexes;
std::vector<IoHash> CidChunkHashes;
@@ -1838,7 +1840,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
m_CidStore.IterateChunks(
CidChunkHashes,
[&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); },
- OptionalWorkerPool);
+ OptionalWorkerPool,
+ LargeSizeLimit);
if (OptionalWorkerPool)
{
@@ -3807,7 +3810,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
return true;
},
- &GetSmallWorkerPool(EWorkloadType::Burst));
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 0);
}
CbObjectWriter Response;
@@ -3931,7 +3935,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 0);
}
CbObjectWriter Response;
@@ -4040,38 +4045,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType);
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunkRange(const std::string_view ProjectId,
- const std::string_view OplogId,
- Oid ChunkId,
- uint64_t Offset,
- uint64_t Size,
- ZenContentType AcceptType,
- CompositeBuffer& OutChunk,
- ZenContentType& OutContentType)
+static std::pair<HttpResponseCode, std::string>
+GetChunkRange(IoBuffer&& Chunk,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ CompositeBuffer& OutChunk,
+ ZenContentType& OutContentType)
{
bool IsOffset = Offset != 0 || Size != ~(0ull);
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
-
- IoBuffer Chunk = FoundLog->FindChunk(ChunkId);
- if (!Chunk)
- {
- return {HttpResponseCode::NotFound, {}};
- }
-
OutContentType = Chunk.GetContentType();
if (OutContentType == ZenContentType::kCompressedBinary)
@@ -4151,11 +4137,43 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
}
-
return {HttpResponseCode::OK, {}};
}
std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunkRange(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ Oid ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ CompositeBuffer& OutChunk,
+ ZenContentType& OutContentType)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
+ }
+ Project->TouchProject();
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+ Project->TouchOplog(OplogId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(ChunkId);
+ if (!Chunk)
+ {
+ return {HttpResponseCode::NotFound, {}};
+ }
+
+ return zen::GetChunkRange(std::move(Chunk), ProjectId, OplogId, ChunkId.ToString(), Offset, Size, AcceptType, OutChunk, OutContentType);
+}
+
+std::pair<HttpResponseCode, std::string>
ProjectStore::GetChunk(const std::string_view ProjectId,
const std::string_view OplogId,
const std::string_view Cid,
@@ -4512,66 +4530,219 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
{
ZEN_TRACE_CPU("Store::Rpc::getchunks");
CbPackage ResponsePackage;
+
+ uint64_t LargeSizeLimit = 0; // Go with default in-memory size
+
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ FormatFlags Flags = FormatFlags::kDefault;
+ int TargetPid = Cb["Pid"sv].AsInt32(0);
+ void* TargetProcessHandle = nullptr;
+
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetPid);
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ else if (TargetProcessHandle != nullptr)
+ {
+ // If we allow partial references to blocks, only put very small chunks in memory
+ LargeSizeLimit = 1024u;
+ }
+ }
+
{
- CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
- std::vector<IoHash> ChunkHashes;
+ CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
+ std::vector<IoHash> ChunkHashes;
+ std::vector<size_t> ChunkHashIndexes;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdIndexes;
+ std::vector<uint64_t> ChunkRangeOffsets;
+ std::vector<uint64_t> ChunkRangeSizes;
+
ChunkHashes.reserve(ChunksArray.Num());
+ size_t IndexOffset = 0;
for (CbFieldView FieldView : ChunksArray)
{
- ChunkHashes.push_back(FieldView.AsHash());
+ if (FieldView.IsHash())
+ {
+ ChunkHashes.push_back(FieldView.AsHash());
+ ChunkHashIndexes.push_back(IndexOffset);
+ }
+ else if (FieldView.IsObjectId())
+ {
+ ChunkIds.push_back(FieldView.AsObjectId());
+ ChunkIdIndexes.push_back(IndexOffset);
+ }
+ IndexOffset++;
}
+ CbArrayView RangeOffsetsArray = Cb["offsets"sv].AsArrayView();
+ ChunkRangeOffsets.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : RangeOffsetsArray)
+ {
+ ChunkRangeOffsets.push_back(FieldView.AsUInt64(0));
+ }
+ ChunkRangeOffsets.resize(ChunksArray.Num(), 0);
+ CbArrayView RangeSizesArray = Cb["sizes"sv].AsArrayView();
+ ChunkRangeSizes.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : RangeSizesArray)
+ {
+ ChunkRangeSizes.push_back(FieldView.AsUInt64((uint64_t)-1));
+ }
+ ChunkRangeSizes.resize(ChunksArray.Num(), (uint64_t)-1);
- std::vector<CompressedBuffer> CompressedBuffers;
- CompressedBuffers.resize(ChunksArray.Num());
- (void)m_CidStore.IterateChunks(
- ChunkHashes,
- [&](size_t Index, const IoBuffer& Payload) {
- try
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
- if (Compressed && ChunkHashes[Index] == RawHash)
+ std::vector<CompressedBuffer> ChunkBuffers;
+ if (!ChunkHashes.empty())
+ {
+ ChunkBuffers.resize(ChunkHashes.size());
+ (void)m_CidStore.IterateChunks(
+ ChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
{
- CompressedBuffers[Index] = Compressed.MakeOwned();
+ size_t RequestIndex = ChunkHashIndexes[Index];
+ ZenContentType ContentType;
+ CompositeBuffer Chunk;
+ std::pair<HttpResponseCode, std::string> GetRangeResponse =
+ zen::GetChunkRange(IoBuffer(Payload),
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index].ToHexString(),
+ ChunkRangeOffsets[RequestIndex],
+ ChunkRangeSizes[RequestIndex],
+ ZenContentType::kCompressedBinary,
+ Chunk,
+ ContentType);
+ if (GetRangeResponse.first == HttpResponseCode::OK)
+ {
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ ChunkBuffers[Index] = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).MakeOwned();
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ GetRangeResponse.second);
+ }
+ return true;
}
- else
+ catch (const std::exception& Ex)
{
- ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}",
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
ProjectId,
OplogId,
- ChunkHashes[Index]);
+ ChunkHashes[Index],
+ Ex.what());
}
return true;
- }
- catch (const std::exception& Ex)
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ LargeSizeLimit);
+ }
+ std::vector<std::pair<ZenContentType, CompositeBuffer>> ValueBuffers;
+ if (!ChunkIds.empty())
+ {
+ ValueBuffers.resize(ChunkIds.size());
+ Oplog->IterateChunks(
+ ChunkIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
+ {
+ size_t RequestIndex = ChunkIdIndexes[Index];
+ CompositeBuffer Chunk;
+ std::pair<HttpResponseCode, std::string> GetRangeResponse =
+ zen::GetChunkRange(IoBuffer(Payload),
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index].ToHexString(),
+ ChunkRangeOffsets[RequestIndex],
+ ChunkRangeSizes[RequestIndex],
+ ZenContentType::kCompressedBinary,
+ Chunk,
+ ValueBuffers[Index].first);
+ if (GetRangeResponse.first == HttpResponseCode::OK)
+ {
+ ValueBuffers[Index].second = std::move(Chunk).MakeOwned();
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ GetRangeResponse.second);
+ }
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ LargeSizeLimit);
+ }
+
+ CbObjectWriter ResponseWriter;
+ if (!ChunkHashes.empty())
+ {
+ ResponseWriter.BeginArray("chunks"sv);
+ for (size_t Index = 0; Index < ChunkHashes.size(); Index++)
+ {
+ CompressedBuffer& Compressed = ChunkBuffers[Index];
+ if (Compressed)
{
- ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
- ProjectId,
- OplogId,
- ChunkHashes[Index],
- Ex.what());
+ const IoHash& RawHash = ChunkHashes[Index];
+ ResponseWriter.AddHash(RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
}
- return true;
- },
- &GetSmallWorkerPool(EWorkloadType::Burst));
+ }
+ ResponseWriter.EndArray();
+ }
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (size_t Index = 0; Index < ChunkHashes.size(); Index++)
+ if (!ChunkIds.empty())
{
- CompressedBuffer& Compressed = CompressedBuffers[Index];
- if (Compressed)
+ ResponseWriter.BeginArray("values"sv);
+ for (size_t Index = 0; Index < ChunkIds.size(); Index++)
{
- const IoHash& RawHash = ChunkHashes[Index];
- ResponseWriter.AddHash(RawHash);
- ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ CompositeBuffer& Composite = ValueBuffers[Index].second;
+ if (Composite)
+ {
+ ResponseWriter.BeginObject();
+ const Oid& Id = ChunkIds[Index];
+ ResponseWriter.AddObjectId("id", Id);
+ if (ValueBuffers[Index].first == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t _;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Composite), RawHash, _);
+ ResponseWriter.AddHash("rawhash", RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ }
+ else
+ {
+ IoHash RawHash = IoHash::HashBuffer(Composite);
+ ResponseWriter.AddHash("rawhash", RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Composite), RawHash));
+ }
+ ResponseWriter.EndObject();
+ }
}
+ ResponseWriter.EndArray();
}
- ResponseWriter.EndArray();
ResponsePackage.SetObject(ResponseWriter.Save());
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
return true;
}
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 49970b677..52debe271 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -7,6 +7,7 @@
#include <zencore/xxhash.h>
#include <zenhttp/httpserver.h>
#include <zenstore/gc.h>
+#include <zenutil/openprocesscache.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -118,10 +119,12 @@ public:
IoBuffer GetChunkByRawHash(const IoHash& RawHash);
bool IterateChunks(std::span<IoHash> RawHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
bool IterateChunks(std::span<Oid> ChunkIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
inline static const uint32_t kInvalidOp = ~0u;
/** Persist a new oplog entry
@@ -469,6 +472,7 @@ private:
mutable RwLock m_UpdateCaptureLock;
uint32_t m_UpdateCaptureRefCounter = 0;
std::unique_ptr<std::vector<std::string>> m_CapturedProjects;
+ OpenProcessCache m_OpenProcessCache;
std::filesystem::path BasePathForProject(std::string_view ProjectId);
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index bff221fc7..4252fc859 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -59,7 +59,8 @@ public:
virtual void FilterChunks(HashKeySet& InOutChunks) override;
virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool) override;
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit) override;
virtual void Flush() override;
virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual CidStoreSize TotalSize() const override;
@@ -392,7 +393,8 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks)
bool
CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
ZEN_TRACE_CPU("CAS::IterateChunks");
if (!m_SmallStrategy.IterateChunks(
@@ -402,7 +404,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
Chunk.SetContentType(ZenContentType::kCompressedBinary);
return AsyncCallback(Index, Payload);
},
- OptionalWorkerPool))
+ OptionalWorkerPool,
+ LargeSizeLimit))
{
return false;
}
@@ -413,7 +416,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
Chunk.SetContentType(ZenContentType::kCompressedBinary);
return AsyncCallback(Index, Payload);
},
- OptionalWorkerPool))
+ OptionalWorkerPool,
+ LargeSizeLimit))
{
return false;
}
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index bedbc6a9a..e279dd2cc 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -47,7 +47,8 @@ public:
virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool) = 0;
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit) = 0;
virtual void Flush() = 0;
virtual void ScrubStorage(ScrubContext& Ctx) = 0;
virtual CidStoreSize TotalSize() const = 0;
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 71fd596f4..2ab769d04 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -119,9 +119,10 @@ struct CidStore::Impl
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+ return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
void Flush() { m_CasStore.Flush(); }
@@ -217,9 +218,10 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
bool
CidStore::IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+ return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
void
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 7f1300177..f479a7173 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -305,7 +305,8 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
bool
CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
std::vector<size_t> FoundChunkIndexes;
std::vector<BlockStoreLocation> FoundChunkLocations;
@@ -332,7 +333,8 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
},
[&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- });
+ },
+ LargeSizeLimit);
};
Latch WorkLatch(1);
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 44567e7a0..07e620086 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -58,7 +58,8 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
void FilterChunks(HashKeySet& InOutChunks);
bool IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
void Initialize(const std::filesystem::path& RootDirectory,
const std::string_view ContainerBaseName,
uint32_t MaxBlockSize,
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index d95fa7cd4..b3d00fec0 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -82,7 +82,8 @@ public:
virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
void Flush();