aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-10-29 11:14:12 +0200
committerPer Larsson <[email protected]>2021-10-29 11:14:12 +0200
commiteadca136e5db2895379dda0f1a66f019d3914a82 (patch)
tree3602079f557a811ee373648600c9beb8aa10a10a
parentFixed crash at startup when updating manifest. (diff)
downloadzen-eadca136e5db2895379dda0f1a66f019d3914a82.tar.xz
zen-eadca136e5db2895379dda0f1a66f019d3914a82.zip
First pass batch request.
-rw-r--r--zenserver-test/zenserver-test.cpp103
-rw-r--r--zenserver/cache/structuredcache.cpp72
-rw-r--r--zenserver/cache/structuredcache.h1
3 files changed, 176 insertions, 0 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 23be9f729..639f40689 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1880,6 +1880,109 @@ TEST_CASE("zcache.policy")
}
}
+TEST_CASE("zcache.batch")
+{
+ using namespace std::literals;
+
+ auto CreateCacheKey = [](uint32_t Id) -> zen::IoHash { return zen::IoHash::HashBuffer(&Id, sizeof(uint32_t)); };
+
+ auto CreateCacheRecord = [](uint32_t Key, size_t PayloadSize) -> zen::CbPackage {
+ std::vector<uint8_t> Data;
+ Data.resize(PayloadSize);
+ for (size_t Idx = 0; Idx < PayloadSize; ++Idx)
+ {
+ Data[Idx] = Idx % 255;
+ }
+
+ auto Compressed = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ zen::CbAttachment Attachment(Compressed);
+
+ zen::CbObjectWriter CacheRecord;
+ CacheRecord << "key" << Key << "data" << Attachment;
+
+ zen::CbPackage Package;
+ Package.SetObject(CacheRecord.Save());
+ Package.AddAttachment(Attachment);
+
+ return Package;
+ };
+
+ auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ SUBCASE("get cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ const std::string_view Bucket = "mybucket"sv;
+ const uint32_t BatchCount = 128;
+
+ // Create cachereords
+ {
+ for (uint32_t Key = 1; Key <= BatchCount; ++Key)
+ {
+ const IoHash CacheKey = CreateCacheKey(Key);
+ CbPackage CacheRecord = CreateCacheRecord(Key, 4096);
+ IoBuffer Payload = ToIoBuffer(CacheRecord);
+
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, CacheKey)},
+ cpr::Body{(const char*)Payload.Data(), Payload.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+ }
+
+ // Get all as a batch
+ {
+ CbObjectWriter BatchQuery;
+
+ BatchQuery.BeginArray("records"sv);
+ for (uint32_t Key = 1; Key <= BatchCount; ++Key)
+ {
+ const IoHash CacheKey = CreateCacheKey(Key);
+ BatchQuery.BeginObject();
+ BatchQuery << "bucket"sv << Bucket << "key" << CacheKey;
+ BatchQuery.EndObject();
+ }
+ BatchQuery.EndArray();
+
+ zen::BinaryWriter Payload;
+ BatchQuery.Save(Payload);
+
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Response);
+
+ CbObjectView BatchResponse = Package.GetObject();
+ for (uint32_t ExpectedKey = 1; CbFieldView ResponseView : BatchResponse["records"])
+ {
+ CbObjectView Response = ResponseView.AsObjectView();
+ const uint32_t Key = Response["key"sv].AsUInt32();
+ const IoHash Attachment = Response["data"sv].AsHash();
+ CHECK(Key == ExpectedKey);
+ ExpectedKey++;
+ }
+ }
+ }
+}
+
struct RemoteExecutionRequest
{
RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath)
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 35cb02cbb..172e122e4 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -210,6 +210,13 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
+ if (Key == "$batch")
+ {
+ const auto QueryParams = Request.GetQueryParams();
+ CachePolicy Policy = ParseCachePolicy(QueryParams);
+ return HandleBatchRequest(Request, Policy);
+ }
+
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -873,6 +880,71 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
+HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy)
+{
+ ZEN_UNUSED(Policy);
+
+ switch (auto Verb = Request.RequestVerb())
+ {
+ using enum HttpVerb;
+
+ case kGet:
+ {
+ const HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType AcceptType = Request.AcceptContentType();
+
+ if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
+
+ CbPackage Package;
+ CbObjectWriter BatchResponse;
+
+ BatchResponse.BeginArray("records"sv);
+
+ for (CbFieldView QueryView : BatchRequest["records"sv])
+ {
+ CbObjectView Query = QueryView.AsObjectView();
+ const std::string_view Bucket = Query["bucket"sv].AsString();
+ const IoHash CacheKey = Query["key"sv].AsHash();
+
+ ZenCacheValue CacheValue;
+ const bool Hit = m_CacheStore.Get(Bucket, CacheKey, CacheValue);
+
+ if (Hit)
+ {
+ CbObjectView CacheRecord(CacheValue.Value.Data());
+
+ CacheRecord.IterateAttachments([this, &Package](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ });
+
+ BatchResponse << CacheRecord;
+ }
+ }
+
+ BatchResponse.EndArray();
+ Package.SetObject(BatchResponse.Save());
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
+
+void
HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index ad7253f79..d6b4944fd 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -89,6 +89,7 @@ private:
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;