diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-11 09:46:09 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2024-11-11 09:46:09 +0100 |
| commit | 4584fd6e56fa5c5a7428828e7d3aea4e25a17977 (patch) | |
| tree | c7f0ae3ea387585fb167fb9f5dfc3ecad8918e34 /src/zenserver/projectstore/httpprojectstore.cpp | |
| parent | use IterateChunks for "getchunks" projectstore rpc request (diff) | |
| download | zen-de/improved-projectstore-batch-requests.tar.xz zen-de/improved-projectstore-batch-requests.zip | |
allow control of size for batch iteration
allow adding compositebuffers as attachments directly
add batch2 httpstore api to allow batching of CAS & Oid with range requests
allow responses with file handles from project store
Signed-off-by: Dan Engelbrecht <[email protected]>
Diffstat (limited to 'src/zenserver/projectstore/httpprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 1b45e66f3..d2e138791 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenstore/zenstore.h> +#include <zenutil/workerpools.h> namespace zen { @@ -257,6 +258,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpVerb::kPost); m_Router.RegisterRoute( + "{project}/oplog/{log}/batch2", + [this](HttpRouterRequest& Req) { HandleChunkBatch2Request(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, HttpVerb::kGet); @@ -456,6 +462,170 @@ HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) } void +HttpProjectService::HandleChunkBatch2Request(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkBatch2"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector<IoBuffer> OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + + std::vector<Oid> ChunkIds; + (void)FoundLog->IterateChunks( + ChunkIds, + [&](size_t Index, const IoBuffer& Payload) { + try + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[Index]; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)) + { + if (SharedBuffer Decompressed = Compressed.Decompress(RequestedChunk.Offset, RequestedChunk.RequestBytes)) + { + OutBlobs[Index + 1] = Decompressed.AsIoBuffer(); + } + } + } + else + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > Payload.Size()) + { + Offset = Payload.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > Payload.Size()) + { + Size = Payload.Size() - Offset; + } + OutBlobs[Index + 1] = IoBuffer(Payload, Offset, Size); + OutBlobs[Index + 1].MakeOwned(); + } + } + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting chunk in batch request for id {}. Reason: '{}'", + ProjectId, + OplogId, + ChunkIds[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 0); + + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = RequestedChunks[ChunkIndex].CorrelationId; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + m_ProjectStats.ChunkHitCount += RequestHdr.ChunkCount; + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); +} + +void HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkBatch"); |