aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
committerDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
commit4584fd6e56fa5c5a7428828e7d3aea4e25a17977 (patch)
treec7f0ae3ea387585fb167fb9f5dfc3ecad8918e34 /src/zenserver/projectstore/projectstore.cpp
parentuse IterateChunks for "getchunks" projectstore rpc request (diff)
downloadzen-de/improved-projectstore-batch-requests.tar.xz
zen-de/improved-projectstore-batch-requests.zip
allow control of size for batch iteration allow adding compositebuffers as attachments directly add batch2 httpstore api to allow batching of CAS & Oid with range requests allow responses with file handles from project store Signed-off-by: Dan Engelbrecht <[email protected]>
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp319
1 files changed, 245 insertions, 74 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 5d9d7aa24..c2bddf532 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1799,15 +1799,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
bool
ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool);
+ return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
bool
ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
std::vector<size_t> CidChunkIndexes;
std::vector<IoHash> CidChunkHashes;
@@ -1838,7 +1840,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
m_CidStore.IterateChunks(
CidChunkHashes,
[&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); },
- OptionalWorkerPool);
+ OptionalWorkerPool,
+ LargeSizeLimit);
if (OptionalWorkerPool)
{
@@ -3807,7 +3810,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
return true;
},
- &GetSmallWorkerPool(EWorkloadType::Burst));
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 0);
}
CbObjectWriter Response;
@@ -3931,7 +3935,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 0);
}
CbObjectWriter Response;
@@ -4040,38 +4045,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType);
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunkRange(const std::string_view ProjectId,
- const std::string_view OplogId,
- Oid ChunkId,
- uint64_t Offset,
- uint64_t Size,
- ZenContentType AcceptType,
- CompositeBuffer& OutChunk,
- ZenContentType& OutContentType)
+static std::pair<HttpResponseCode, std::string>
+GetChunkRange(IoBuffer&& Chunk,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ CompositeBuffer& OutChunk,
+ ZenContentType& OutContentType)
{
bool IsOffset = Offset != 0 || Size != ~(0ull);
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
-
- IoBuffer Chunk = FoundLog->FindChunk(ChunkId);
- if (!Chunk)
- {
- return {HttpResponseCode::NotFound, {}};
- }
-
OutContentType = Chunk.GetContentType();
if (OutContentType == ZenContentType::kCompressedBinary)
@@ -4151,11 +4137,43 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
}
-
return {HttpResponseCode::OK, {}};
}
std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunkRange(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ Oid ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ CompositeBuffer& OutChunk,
+ ZenContentType& OutContentType)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
+ }
+ Project->TouchProject();
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+ Project->TouchOplog(OplogId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(ChunkId);
+ if (!Chunk)
+ {
+ return {HttpResponseCode::NotFound, {}};
+ }
+
+ return zen::GetChunkRange(std::move(Chunk), ProjectId, OplogId, ChunkId.ToString(), Offset, Size, AcceptType, OutChunk, OutContentType);
+}
+
+std::pair<HttpResponseCode, std::string>
ProjectStore::GetChunk(const std::string_view ProjectId,
const std::string_view OplogId,
const std::string_view Cid,
@@ -4512,66 +4530,219 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
{
ZEN_TRACE_CPU("Store::Rpc::getchunks");
CbPackage ResponsePackage;
+
+ uint64_t LargeSizeLimit = 0; // Go with default in-memory size
+
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ FormatFlags Flags = FormatFlags::kDefault;
+ int TargetPid = Cb["Pid"sv].AsInt32(0);
+ void* TargetProcessHandle = nullptr;
+
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetPid);
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ else if (TargetProcessHandle != nullptr)
+ {
+ // If we allow partial references to blocks, only put very small chunks in memory
+ LargeSizeLimit = 1024u;
+ }
+ }
+
{
- CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
- std::vector<IoHash> ChunkHashes;
+ CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
+ std::vector<IoHash> ChunkHashes;
+ std::vector<size_t> ChunkHashIndexes;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdIndexes;
+ std::vector<uint64_t> ChunkRangeOffsets;
+ std::vector<uint64_t> ChunkRangeSizes;
+
ChunkHashes.reserve(ChunksArray.Num());
+ size_t IndexOffset = 0;
for (CbFieldView FieldView : ChunksArray)
{
- ChunkHashes.push_back(FieldView.AsHash());
+ if (FieldView.IsHash())
+ {
+ ChunkHashes.push_back(FieldView.AsHash());
+ ChunkHashIndexes.push_back(IndexOffset);
+ }
+ else if (FieldView.IsObjectId())
+ {
+ ChunkIds.push_back(FieldView.AsObjectId());
+ ChunkIdIndexes.push_back(IndexOffset);
+ }
+ IndexOffset++;
}
+ CbArrayView RangeOffsetsArray = Cb["offsets"sv].AsArrayView();
+ ChunkRangeOffsets.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : RangeOffsetsArray)
+ {
+ ChunkRangeOffsets.push_back(FieldView.AsUInt64(0));
+ }
+ ChunkRangeOffsets.resize(ChunksArray.Num(), 0);
+ CbArrayView RangeSizesArray = Cb["sizes"sv].AsArrayView();
+ ChunkRangeSizes.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : RangeSizesArray)
+ {
+ ChunkRangeSizes.push_back(FieldView.AsUInt64((uint64_t)-1));
+ }
+ ChunkRangeSizes.resize(ChunksArray.Num(), (uint64_t)-1);
- std::vector<CompressedBuffer> CompressedBuffers;
- CompressedBuffers.resize(ChunksArray.Num());
- (void)m_CidStore.IterateChunks(
- ChunkHashes,
- [&](size_t Index, const IoBuffer& Payload) {
- try
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
- if (Compressed && ChunkHashes[Index] == RawHash)
+ std::vector<CompressedBuffer> ChunkBuffers;
+ if (!ChunkHashes.empty())
+ {
+ ChunkBuffers.resize(ChunkHashes.size());
+ (void)m_CidStore.IterateChunks(
+ ChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
{
- CompressedBuffers[Index] = Compressed.MakeOwned();
+ size_t RequestIndex = ChunkHashIndexes[Index];
+ ZenContentType ContentType;
+ CompositeBuffer Chunk;
+ std::pair<HttpResponseCode, std::string> GetRangeResponse =
+ zen::GetChunkRange(IoBuffer(Payload),
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index].ToHexString(),
+ ChunkRangeOffsets[RequestIndex],
+ ChunkRangeSizes[RequestIndex],
+ ZenContentType::kCompressedBinary,
+ Chunk,
+ ContentType);
+ if (GetRangeResponse.first == HttpResponseCode::OK)
+ {
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ ChunkBuffers[Index] = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).MakeOwned();
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ GetRangeResponse.second);
+ }
+ return true;
}
- else
+ catch (const std::exception& Ex)
{
- ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}",
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
ProjectId,
OplogId,
- ChunkHashes[Index]);
+ ChunkHashes[Index],
+ Ex.what());
}
return true;
- }
- catch (const std::exception& Ex)
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ LargeSizeLimit);
+ }
+ std::vector<std::pair<ZenContentType, CompositeBuffer>> ValueBuffers;
+ if (!ChunkIds.empty())
+ {
+ ValueBuffers.resize(ChunkIds.size());
+ Oplog->IterateChunks(
+ ChunkIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
+ {
+ size_t RequestIndex = ChunkIdIndexes[Index];
+ CompositeBuffer Chunk;
+ std::pair<HttpResponseCode, std::string> GetRangeResponse =
+ zen::GetChunkRange(IoBuffer(Payload),
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index].ToHexString(),
+ ChunkRangeOffsets[RequestIndex],
+ ChunkRangeSizes[RequestIndex],
+ ZenContentType::kCompressedBinary,
+ Chunk,
+ ValueBuffers[Index].first);
+ if (GetRangeResponse.first == HttpResponseCode::OK)
+ {
+ ValueBuffers[Index].second = std::move(Chunk).MakeOwned();
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ GetRangeResponse.second);
+ }
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkHashes[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ LargeSizeLimit);
+ }
+
+ CbObjectWriter ResponseWriter;
+ if (!ChunkHashes.empty())
+ {
+ ResponseWriter.BeginArray("chunks"sv);
+ for (size_t Index = 0; Index < ChunkHashes.size(); Index++)
+ {
+ CompressedBuffer& Compressed = ChunkBuffers[Index];
+ if (Compressed)
{
- ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for chunk {}. Reason: '{}'",
- ProjectId,
- OplogId,
- ChunkHashes[Index],
- Ex.what());
+ const IoHash& RawHash = ChunkHashes[Index];
+ ResponseWriter.AddHash(RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
}
- return true;
- },
- &GetSmallWorkerPool(EWorkloadType::Burst));
+ }
+ ResponseWriter.EndArray();
+ }
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (size_t Index = 0; Index < ChunkHashes.size(); Index++)
+ if (!ChunkIds.empty())
{
- CompressedBuffer& Compressed = CompressedBuffers[Index];
- if (Compressed)
+ ResponseWriter.BeginArray("values"sv);
+ for (size_t Index = 0; Index < ChunkIds.size(); Index++)
{
- const IoHash& RawHash = ChunkHashes[Index];
- ResponseWriter.AddHash(RawHash);
- ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ CompositeBuffer& Composite = ValueBuffers[Index].second;
+ if (Composite)
+ {
+ ResponseWriter.BeginObject();
+ const Oid& Id = ChunkIds[Index];
+ ResponseWriter.AddObjectId("id", Id);
+ if (ValueBuffers[Index].first == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t _;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Composite), RawHash, _);
+ ResponseWriter.AddHash("rawhash", RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ }
+ else
+ {
+ IoHash RawHash = IoHash::HashBuffer(Composite);
+ ResponseWriter.AddHash("rawhash", RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(Composite), RawHash));
+ }
+ ResponseWriter.EndObject();
+ }
}
+ ResponseWriter.EndArray();
}
- ResponseWriter.EndArray();
ResponsePackage.SetObject(ResponseWriter.Save());
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
return true;
}