aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2024-11-29 17:44:15 -0800
committerLiam Mitchell <[email protected]>2024-12-02 17:56:38 -0800
commit2d729f0aba4cd8b76e3aff45c8aed52b29c9f9af (patch)
treeb3df6d3187f379eed741d9db686108ee83016d46
parent5.5.12 (diff)
downloadzen-lm/batch-request-content-type.tar.xz
zen-lm/batch-request-content-type.zip
Return the proper range of bytes in chunk batch requests based on chunklm/batch-request-content-type
content type and requested content type.
-rw-r--r--src/zenserver-test/zenserver-test.cpp159
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp58
2 files changed, 172 insertions, 45 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index ca2257361..9fc4ca323 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -359,48 +359,77 @@ TEST_CASE("project.basic")
zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo"
- << "attachment" << Attach;
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
- OpPackage.AddAttachment(Attach);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
+ constexpr int ChunkCount = 2;
+
+ const std::string_view ChunkIds[ChunkCount] = {
+ {"00000000"
+ "00000000"
+ "00010000"},
+ {"00000000"
+ "00000000"
+ "00020000"},
+ };
+
+ zen::Oid FileOids[ChunkCount] = {
+ zen::Oid::FromHexString(ChunkIds[0]),
+ zen::Oid::FromHexString(ChunkIds[1]),
+ };
+
+ zen::CbObject Ops[ChunkCount];
+ zen::CbObjectWriter OpWriters[ChunkCount];
+
+ OpWriters[0] << "key"
+ << "foo"
+ << "attachment" << Attach;
+
+ OpWriters[0].BeginArray("files");
+ OpWriters[0].BeginObject();
+ OpWriters[0] << "id" << FileOids[0];
+ OpWriters[0] << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriters[0] << "serverpath" << BinPath.c_str();
+ OpWriters[0].EndObject();
+ OpWriters[0].EndArray();
+
+ OpWriters[1] << "key"
+ << "bar"
+ << "attachment" << Attach;
+
+ OpWriters[1].BeginArray("packagedata");
+ OpWriters[1].BeginObject();
+ OpWriters[1] << "id" << FileOids[1];
+ OpWriters[1] << "data" << Attach;
+ OpWriters[1].EndObject();
+ OpWriters[1].EndArray();
+
+ zen::BinaryWriter Out[ChunkCount];
+
+ for (int i = 0; i < ChunkCount; ++i)
+ {
+ zen::CbObject Op = OpWriters[i].Save();
+ zen::CbPackage OpPackage(Op);
+ OpPackage.AddAttachment(Attach);
+ legacy::SaveCbPackage(OpPackage, Out[i]);
+ }
{
zen::StringBuilder<64> PostUri;
PostUri << BaseUri << "/new";
- auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
- REQUIRE(!Response.error);
- CHECK(Response.status_code == 201);
+ for (const zen::BinaryWriter& Writer : Out)
+ {
+ auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)Writer.Data(), Writer.Size()});
+
+ REQUIRE(!Response.error);
+ CHECK(Response.status_code == 201);
+ }
}
// Read file data
{
zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << BaseUri << "/" << ChunkId;
+ ChunkGetUri << BaseUri << "/" << ChunkIds[0];
auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
REQUIRE(!Response.error);
@@ -409,7 +438,7 @@ TEST_CASE("project.basic")
{
zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10";
+ ChunkGetUri << BaseUri << "/" << ChunkIds[0] << "?offset=1&size=10";
auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
REQUIRE(!Response.error);
@@ -417,6 +446,72 @@ TEST_CASE("project.basic")
CHECK(Response.text.size() == 10);
}
+ {
+ zen::StringBuilder<128> BatchGetUri;
+ BatchGetUri << BaseUri << "/batch";
+
+ std::vector<RequestChunkEntry> Entries;
+ Entries.emplace_back(FileOids[0], 0, 0, uint64_t(-1));
+ Entries.emplace_back(FileOids[1], 1, 0, 3);
+
+ IoBuffer Payload = BuildChunkBatchRequest(Entries);
+
+ auto Response = cpr::Post(cpr::Url{BatchGetUri.c_str()},
+ cpr::Header{{"Accept", "application/x-ue-comp"}},
+ cpr::Body{(const char*)Payload.Data(), Payload.Size()});
+
+ REQUIRE(!Response.error);
+
+ IoBuffer ResponseData(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ std::vector<IoBuffer> Chunks = ParseChunkBatchResponse(ResponseData);
+
+ REQUIRE(Chunks.size() == 2);
+
+ for (int i = 0; i < ChunkCount; ++i)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunks[i]), RawHash, RawSize);
+ SharedBuffer Decompressed = Compressed.Decompress();
+
+ REQUIRE(!Decompressed.IsNull());
+
+ if (i == 1)
+ {
+ CHECK(std::memcmp(Decompressed.GetData(), AttachData, sizeof(AttachData)) == 0);
+ }
+ }
+ }
+
+ {
+ zen::StringBuilder<128> BatchGetUri;
+ BatchGetUri << BaseUri << "/batch";
+
+ std::vector<RequestChunkEntry> Entries;
+ Entries.emplace_back(FileOids[0], 0, 0, uint64_t(-1));
+ Entries.emplace_back(FileOids[1], 1, 0, 3);
+
+ IoBuffer Payload = BuildChunkBatchRequest(Entries);
+
+ auto Response = cpr::Post(cpr::Url{BatchGetUri.c_str()},
+ cpr::Header{{"Accept", "application/octet-stream"}},
+ cpr::Body{(const char*)Payload.Data(), Payload.Size()});
+
+ REQUIRE(!Response.error);
+
+ IoBuffer ResponseData(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ std::vector<IoBuffer> Chunks = ParseChunkBatchResponse(ResponseData);
+
+ REQUIRE(Chunks.size() == 2);
+
+ for (int i = 0; i < ChunkCount; ++i)
+ {
+ REQUIRE(Chunks[i].GetSize() > 0);
+ }
+
+ CHECK(std::memcmp(Chunks[1].GetData(), AttachData, sizeof(AttachData)) == 0);
+ }
+
ZEN_INFO("+++++++");
}
SUBCASE("build store op commit") { ZEN_INFO("-------"); }
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 2954bcdc0..9c399f27a 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -544,29 +544,58 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
uint64_t ChunkSize;
};
+ ZenContentType AcceptType = HttpReq.AcceptContentType();
+
std::vector<IoBuffer> OutBlobs;
OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
{
const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId);
- if (FoundChunk)
+ if (!FoundChunk)
+ {
+ continue;
+ }
+
+ ZenContentType ContentType = FoundChunk.GetContentType();
+
+ uint64_t Offset = RequestedChunk.Offset;
+ uint64_t Size = RequestedChunk.RequestBytes;
+
+ if (ContentType == ZenContentType::kCompressedBinary)
{
- if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Buffer = CompressedBuffer::FromCompressed(SharedBuffer(std::move(FoundChunk)), RawHash, RawSize);
+
+ Offset = std::clamp<uint64_t>(Offset, 0, RawSize);
+ Size = std::clamp<uint64_t>(Size, 0, RawSize - Offset);
+
+ if (AcceptType == ZenContentType::kCompressedBinary)
+ {
+ FoundChunk = Buffer.GetRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ }
+ else
+ {
+ FoundChunk = Buffer.Decompress(Offset, Size).AsIoBuffer();
+ }
+ }
+ else
+ {
+ Offset = std::clamp<uint64_t>(Offset, 0, FoundChunk.GetSize());
+ Size = std::clamp<uint64_t>(Size, 0, FoundChunk.GetSize() - Offset);
+
+ if (AcceptType == ZenContentType::kCompressedBinary)
+ {
+ CompressedBuffer Buffer = CompressedBuffer::Compress(SharedBuffer(std::move(FoundChunk)));
+ FoundChunk = Buffer.GetRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ }
+ else
{
- uint64_t Offset = RequestedChunk.Offset;
- if (Offset > FoundChunk.Size())
- {
- Offset = FoundChunk.Size();
- }
- uint64_t Size = RequestedChunk.RequestBytes;
- if ((Offset + Size) > FoundChunk.Size())
- {
- Size = FoundChunk.Size() - Offset;
- }
FoundChunk = IoBuffer(FoundChunk, Offset, Size);
}
}
+
OutBlobs.emplace_back(std::move(FoundChunk));
}
uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
@@ -594,7 +623,10 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
ResponsePtr += sizeof(ResponseChunk);
}
std::erase_if(OutBlobs, [](IoBuffer Buffer) -> bool { return !Buffer; });
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
+ return HttpReq.WriteResponse(
+ HttpResponseCode::OK,
+ AcceptType == ZenContentType::kCompressedBinary ? HttpContentType::kCompressedBinary : HttpContentType::kBinary,
+ OutBlobs);
}
void