aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormattpetersepic <[email protected]>2022-01-27 09:16:05 -0700
committerGitHub <[email protected]>2022-01-27 09:16:05 -0700
commit9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8 (patch)
treeb304de1c944480adcd3fda442afa8734dd3cd589
parentFixed ZenServerEntry::*ListenPort compile errors (diff)
downloadzen-9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8.tar.xz
zen-9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8.zip
Add batched CacheRecord put rpc (#38)
* Add batched CacheRecord put rpc
-rw-r--r--zenserver-test/zenserver-test.cpp91
-rw-r--r--zenserver/cache/structuredcache.cpp195
-rw-r--r--zenserver/cache/structuredcache.h9
3 files changed, 248 insertions, 47 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index f51fd1504..85393aed2 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1814,7 +1814,12 @@ TEST_CASE("zcache.rpc")
{
using namespace std::literals;
- auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage {
+ auto AppendCacheRecord = [](CbPackage& Package,
+ CbWriter& Writer,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy /* BatchDefaultPolicy */,
+ CachePolicy RecordPolicy) {
std::vector<uint8_t> Data;
Data.resize(PayloadSize);
for (size_t Idx = 0; Idx < PayloadSize; ++Idx)
@@ -1824,17 +1829,24 @@ TEST_CASE("zcache.rpc")
zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())));
- zen::CbObjectWriter CacheRecord;
- CacheRecord.BeginObject("CacheKey"sv);
- CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
- CacheRecord.EndObject();
- CacheRecord << "Data"sv << Attachment;
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
+ }
+ Writer.EndObject();
+ Writer << "Data"sv << Attachment;
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ CacheRecordPolicy(RecordPolicy).Save(Writer);
+ }
+ Writer.EndObject();
- zen::CbPackage Package;
- Package.SetObject(CacheRecord.Save());
Package.AddAttachment(Attachment);
-
- return Package;
};
auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
@@ -1843,27 +1855,46 @@ TEST_CASE("zcache.rpc")
return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
};
- auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri,
- std::string_view Query,
- std::string_view Bucket,
- size_t Num,
- size_t PayloadSize = 1024) -> std::vector<CacheKey> {
+ auto PutCacheRecords =
+ [&AppendCacheRecord,
+ &ToIoBuffer](std::string_view BaseUri, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector<CacheKey> {
std::vector<zen::CacheKey> OutKeys;
for (uint32_t Key = 1; Key <= Num; ++Key)
{
- const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof(uint32_t)));
- CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize);
+ zen::IoHash KeyHash;
+ ((uint32_t*)(KeyHash.Hash))[0] = Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+ CbPackage Package;
+ CbWriter Writer;
- OutKeys.push_back(CacheKey);
+ Writer.BeginObject();
+ {
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ {
+ AppendCacheRecord(Package, Writer, CacheKey, PayloadSize, BatchDefaultPolicy, CachePolicy::Default);
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save().AsObject());
- IoBuffer Payload = ToIoBuffer(CacheRecord);
+ OutKeys.push_back(CacheKey);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}{}", BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)},
- cpr::Body{(const char*)Payload.Data(), Payload.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK(Result.status_code == 201);
+ CHECK(Result.status_code == 200);
}
return OutKeys;
@@ -1948,7 +1979,7 @@ TEST_CASE("zcache.rpc")
Inst.WaitUntilReady();
CacheRecordPolicy Policy;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "mastodon"sv, 128);
GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
CHECK(Result.Records.size() == Keys.size());
@@ -1958,7 +1989,7 @@ TEST_CASE("zcache.rpc")
const CacheKey& ExpectedKey = Keys[Index++];
CbObjectView RecordObj = RecordView.AsObjectView();
- CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
+ CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView();
const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
@@ -1980,7 +2011,7 @@ TEST_CASE("zcache.rpc")
Inst.WaitUntilReady();
CacheRecordPolicy Policy;
- std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "mastodon"sv, 128);
std::vector<zen::CacheKey> Keys;
for (const zen::CacheKey& Key : ExistingKeys)
@@ -2006,7 +2037,7 @@ TEST_CASE("zcache.rpc")
{
const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
CbObjectView RecordObj = RecordView.AsObjectView();
- zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ zen::CacheKey Key = LoadKey(RecordObj["Key"sv]);
const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
@@ -2028,7 +2059,7 @@ TEST_CASE("zcache.rpc")
SpawnServer(UpstreamServer, UpstreamCfg);
SpawnServer(LocalServer, LocalCfg);
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4);
CacheRecordPolicy Policy(CachePolicy::QueryLocal);
GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
@@ -2053,7 +2084,7 @@ TEST_CASE("zcache.rpc")
SpawnServer(UpstreamServer, UpstreamCfg);
SpawnServer(LocalServer, LocalCfg);
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4);
CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote);
GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
@@ -2064,7 +2095,7 @@ TEST_CASE("zcache.rpc")
{
const zen::CacheKey& ExpectedKey = Keys[Index++];
CbObjectView RecordObj = RecordView.AsObjectView();
- zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ zen::CacheKey Key = LoadKey(RecordObj["Key"sv]);
CHECK(Key == ExpectedKey);
}
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 0f385116b..cb29b3645 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -15,6 +15,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpshared.h>
#include <zenstore/cas.h>
#include <zenutil/cache/cache.h>
@@ -55,6 +56,13 @@ struct AttachmentCount
uint32_t Total = 0;
};
+struct PutRequestData
+{
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+};
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -787,27 +795,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
- if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
+ AcceptType != HttpContentType::kCbPackage)
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync(
- [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
- const std::string_view Method = RpcRequest["Method"sv].AsString();
- if (Method == "GetCacheRecords"sv)
- {
- HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
- }
- else if (Method == "GetCacheValues"sv)
- {
- HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
- }
- else
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- }
- });
+ Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == HttpContentType::kCbObject)
+ {
+ ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ const std::string_view Method = Object["Method"sv].AsString();
+ if (Method == "PutCacheRecords"sv)
+ {
+ HandleRpcPutCacheRecords(AsyncRequest, Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ HandleRpcGetCacheRecords(AsyncRequest, Object);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ HandleRpcGetCachePayloads(AsyncRequest, Object);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
}
break;
default:
@@ -817,6 +842,142 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
void
+HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : Params["Requests"sv])
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
+ CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ CbFieldView HashField = KeyView["Hash"sv];
+ CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
+
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+
+ if (Result == PutResult::Invalid)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ Results.push_back(Result == PutResult::Success);
+ }
+ if (Results.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+HttpStructuredCacheService::PutResult
+HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+{
+ std::vector<IoHash> ValidAttachments;
+ AttachmentCount Count;
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ ValidAttachments.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+ else
+ {
+ ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(HttpContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ return PutResult::Invalid;
+ }
+
+ ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ ZenCacheValue CacheValue;
+ MemoryView RecordView = Record.GetView();
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue);
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .PayloadIds = std::move(ValidAttachments)});
+ }
+ return PutResult::Success;
+}
+
+void
HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index a7ecba845..5b6b10898 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -19,6 +19,7 @@ namespace zen {
class CasStore;
class CidStore;
class CbObjectView;
+struct PutRequestData;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
@@ -82,6 +83,12 @@ private:
std::atomic_uint64_t UpstreamHitCount{};
std::atomic_uint64_t MissCount{};
};
+ enum class PutResult
+ {
+ Success,
+ Fail,
+ Invalid,
+ };
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
@@ -91,11 +98,13 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandleRpcRequest(zen::HttpServerRequest& Request);
+ void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;