aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/httpprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
committerDan Engelbrecht <[email protected]>2024-11-11 09:46:09 +0100
commit4584fd6e56fa5c5a7428828e7d3aea4e25a17977 (patch)
treec7f0ae3ea387585fb167fb9f5dfc3ecad8918e34 /src/zenserver/projectstore/httpprojectstore.cpp
parentuse IterateChunks for "getchunks" projectstore rpc request (diff)
downloadzen-de/improved-projectstore-batch-requests.tar.xz
zen-de/improved-projectstore-batch-requests.zip
allow control of size for batch iteration allow adding compositebuffers as attachments directly add batch2 httpstore api to allow batching of CAS & Oid with range requests allow responses with file handles from project store Signed-off-by: Dan Engelbrecht <[email protected]>
Diffstat (limited to 'src/zenserver/projectstore/httpprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 1b45e66f3..d2e138791 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/stream.h>
#include <zencore/trace.h>
#include <zenstore/zenstore.h>
+#include <zenutil/workerpools.h>
namespace zen {
@@ -257,6 +258,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
HttpVerb::kPost);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/batch2",
+ [this](HttpRouterRequest& Req) { HandleChunkBatch2Request(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"{project}/oplog/{log}/files",
[this](HttpRouterRequest& Req) { HandleFilesRequest(Req); },
HttpVerb::kGet);
@@ -456,6 +462,170 @@ HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req)
}
void
+HttpProjectService::HandleChunkBatch2Request(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkBatch2");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ // Parse Request
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ BinaryReader Reader(Payload);
+
+ struct RequestHeader
+ {
+ enum
+ {
+ kMagic = 0xAAAA'77AC
+ };
+ uint32_t Magic;
+ uint32_t ChunkCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+ };
+
+ struct RequestChunkEntry
+ {
+ Oid ChunkId;
+ uint32_t CorrelationId;
+ uint64_t Offset;
+ uint64_t RequestBytes;
+ };
+
+ if (Payload.Size() <= sizeof(RequestHeader))
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ RequestHeader RequestHdr;
+ Reader.Read(&RequestHdr, sizeof RequestHdr);
+
+ if (RequestHdr.Magic != RequestHeader::kMagic)
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<RequestChunkEntry> RequestedChunks;
+ RequestedChunks.resize(RequestHdr.ChunkCount);
+ Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
+
+ // Make Response
+
+ struct ResponseHeader
+ {
+ uint32_t Magic = 0xbada'b00f;
+ uint32_t ChunkCount;
+ uint32_t Reserved1 = 0;
+ uint32_t Reserved2 = 0;
+ };
+
+ struct ResponseChunkEntry
+ {
+ uint32_t CorrelationId;
+ uint32_t Flags = 0;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<IoBuffer> OutBlobs;
+ OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
+
+ std::vector<Oid> ChunkIds;
+ (void)FoundLog->IterateChunks(
+ ChunkIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ try
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[Index];
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize))
+ {
+ if (SharedBuffer Decompressed = Compressed.Decompress(RequestedChunk.Offset, RequestedChunk.RequestBytes))
+ {
+ OutBlobs[Index + 1] = Decompressed.AsIoBuffer();
+ }
+ }
+ }
+ else
+ {
+ if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ {
+ uint64_t Offset = RequestedChunk.Offset;
+ if (Offset > Payload.Size())
+ {
+ Offset = Payload.Size();
+ }
+ uint64_t Size = RequestedChunk.RequestBytes;
+ if ((Offset + Size) > Payload.Size())
+ {
+ Size = Payload.Size() - Offset;
+ }
+ OutBlobs[Index + 1] = IoBuffer(Payload, Offset, Size);
+ OutBlobs[Index + 1].MakeOwned();
+ }
+ }
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for id {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ ChunkIds[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 0);
+
+ uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
+ ResponseHeader ResponseHdr;
+ ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
+ memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
+ ResponsePtr += sizeof(ResponseHdr);
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
+ ResponseChunkEntry ResponseChunk;
+ ResponseChunk.CorrelationId = RequestedChunks[ChunkIndex].CorrelationId;
+ if (FoundChunk)
+ {
+ ResponseChunk.ChunkSize = FoundChunk.Size();
+ }
+ else
+ {
+ ResponseChunk.ChunkSize = uint64_t(-1);
+ }
+ memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
+ ResponsePtr += sizeof(ResponseChunk);
+ }
+ m_ProjectStats.ChunkHitCount += RequestHdr.ChunkCount;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
+}
+
+void
HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
{
ZEN_TRACE_CPU("ProjectService::ChunkBatch");