aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-08 16:00:10 +0100
committerPer Larsson <[email protected]>2021-11-08 16:00:10 +0100
commit4f503210dca72fdaeee61693626ef6085e93e030 (patch)
tree1ca445c85a0a4c30459fe73df6245c5c98d4e9a6
parentAdded upstream batch API. (diff)
downloadzen-4f503210dca72fdaeee61693626ef6085e93e030.tar.xz
zen-4f503210dca72fdaeee61693626ef6085e93e030.zip
Added batched get chunk(s).
-rw-r--r--zenserver-test/zenserver-test.cpp26
-rw-r--r--zenserver/cache/cachekey.h18
-rw-r--r--zenserver/cache/structuredcache.cpp353
-rw-r--r--zenserver/cache/structuredcache.h2
4 files changed, 276 insertions, 123 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 70515770c..7b9b91466 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1953,7 +1953,7 @@ TEST_CASE("zcache.batch")
CbObjectWriter BatchRequest;
BatchRequest << "method"
- << "getcacherecords";
+ << "get-cache-records";
BatchRequest.BeginObject("params");
BatchRequest.BeginArray("cachekeys"sv);
@@ -1962,7 +1962,7 @@ TEST_CASE("zcache.batch")
{
const IoHash CacheKey = CreateCacheKey(Key);
BatchRequest.BeginObject();
- BatchRequest << "bucket"sv << Bucket << "key" << CacheKey;
+ BatchRequest << "bucket"sv << Bucket << "hash" << CacheKey;
BatchRequest.EndObject();
}
@@ -1972,9 +1972,9 @@ TEST_CASE("zcache.batch")
zen::BinaryWriter Payload;
BatchRequest.Save(Payload);
- cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ cpr::Response Result = cpr::Post(cpr::Url{"{}/$batch"_format(BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
CHECK(Result.status_code == 200);
@@ -2037,22 +2037,22 @@ TEST_CASE("zcache.batch")
CbObjectWriter BatchRequest;
BatchRequest << "method"
- << "getcacherecords";
+ << "get-cache-records";
BatchRequest.BeginObject("params");
BatchRequest.BeginArray("cachekeys"sv);
BatchRequest.BeginObject();
- BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(1);
+ BatchRequest << "bucket"sv << Bucket << "hash" << CreateCacheKey(1);
BatchRequest.EndObject();
BatchRequest.BeginObject();
- BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(11); // Missing
+ BatchRequest << "bucket"sv << Bucket << "hash" << CreateCacheKey(11); // Missing
BatchRequest.EndObject();
BatchRequest.BeginObject();
- BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(2);
+ BatchRequest << "bucket"sv << Bucket << "hash" << CreateCacheKey(2);
BatchRequest.EndObject();
BatchRequest.BeginObject();
- BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(22); // Missing
+ BatchRequest << "bucket"sv << Bucket << "hash" << CreateCacheKey(22); // Missing
BatchRequest.EndObject();
BatchRequest.EndArray();
@@ -2061,9 +2061,9 @@ TEST_CASE("zcache.batch")
zen::BinaryWriter Body;
BatchRequest.Save(Body);
- cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ cpr::Response Result = cpr::Post(cpr::Url{"{}/$batch"_format(BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK(Result.status_code == 200);
diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h
index eba063699..012a01292 100644
--- a/zenserver/cache/cachekey.h
+++ b/zenserver/cache/cachekey.h
@@ -4,6 +4,7 @@
#include <zencore/iohash.h>
#include <zencore/string.h>
+#include <zencore/uid.h>
#include <gsl/gsl-lite.hpp>
namespace zen {
@@ -40,13 +41,18 @@ struct CacheKey
{
std::string Bucket;
IoHash Hash;
-
- static CacheKey Create(std::string_view Bucket, const IoHash& Hash)
- {
- return {.Bucket = ToLower(Bucket), .Hash = Hash};
- }
-
+
+ static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; }
+
static CacheKey None;
};
+struct CacheChunk
+{
+ CacheKey Key;
+ IoHash Id;
+ uint64_t RawOffset = 0ull;
+ uint64_t RawSize = ~uint64_t(0);
+};
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 59cc74d53..a1b0e1549 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -231,8 +231,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (ValidCount != AttachmentCount)
{
- Success = false;
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ // Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments",
Ref.BucketSegment,
Ref.HashKey,
ToString(AcceptType),
@@ -554,12 +554,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
});
- const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
+ const bool IsPartialCacheRecord = ValidAttachments.size() != Attachments.size();
- if (!AttachmentsValid)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
- }
+ // if (!AttachmentsValid)
+ //{
+ // return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ //}
ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments",
Ref.BucketSegment,
@@ -574,7 +574,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (StoreUpstream)
+ if (StoreUpstream && !IsPartialCacheRecord)
{
ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
@@ -786,15 +786,11 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
{
using namespace fmt::literals;
- const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
- const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
- const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
-
switch (auto Verb = Request.RequestVerb())
{
using enum HttpVerb;
- case kGet:
+ case kPost:
{
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
@@ -807,122 +803,271 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
const std::string_view Method = BatchRequest["method"sv].AsString();
- if (Method != "getcacherecords"sv)
+ if (Method == "get-cache-records"sv)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Invalid method '{}'"_format(Method));
+ HandleBatchGetCacheRecords(Request, BatchRequest, Policy);
}
+ else if (Method == "get-cache-chunks"sv)
+ {
+ HandleBatchGetCacheChunks(Request, BatchRequest, Policy);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
- CbObjectView Params = BatchRequest["params"sv].AsObjectView();
+void
+HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
+{
+ using namespace fmt::literals;
- std::vector<CacheKey> CacheKeys;
- std::vector<IoBuffer> CacheValues;
- std::vector<IoBuffer> Payloads;
- std::vector<size_t> Missing;
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
- for (CbFieldView QueryView : Params["cachekeys"sv])
- {
- CbObjectView Query = QueryView.AsObjectView();
- CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["key"sv].AsHash()));
- }
+ const std::string_view Method = BatchRequest["method"sv].AsString();
+ ZEN_ASSERT(Method == "get-cache-records"sv);
- CacheValues.resize(CacheKeys.size());
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
- for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
- {
- CbObjectView CacheRecord(CacheValue.Value.Data());
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<IoBuffer> Payloads;
+ std::vector<size_t> Missing;
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Payloads.push_back(Chunk);
- }
- });
- }
+ for (CbFieldView QueryView : Params["cachekeys"sv])
+ {
+ CbObjectView Query = QueryView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["hash"sv].AsHash()));
+ }
- CacheValues[Idx] = CacheValue.Value;
- }
- else
+ CacheValues.resize(CacheKeys.size());
+
+ for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ CbObjectView CacheRecord(CacheValue.Value.Data());
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Missing.push_back(Idx);
+ Payloads.push_back(Chunk);
}
+ });
+ }
- ++Idx;
- }
+ CacheValues[Idx] = CacheValue.Value;
+ }
+ else
+ {
+ Missing.push_back(Idx);
+ }
+
+ ++Idx;
+ }
- if (QueryUpstream)
+ if (!Missing.empty() && QueryUpstream)
+ {
+ auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
+ CacheKeys,
+ Missing,
+ [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
+ CbPackage UpstreamPackage;
+ if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
{
- auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
- CacheKeys,
- Missing,
- [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- CbPackage UpstreamPackage;
- if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
+ CbObjectView CacheRecord = UpstreamPackage.GetObject();
+
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
{
- CbObjectView CacheRecord = UpstreamPackage.GetObject();
-
- CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
- {
- if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
- {
- m_CidStore.AddChunk(Chunk);
-
- if (!SkipAttachments)
- {
- Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- }
- }
- }
- });
-
- CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ m_CidStore.AddChunk(Chunk);
+
+ if (!SkipAttachments)
+ {
+ Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ }
}
- });
+ }
+ });
+
+ CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
}
+ });
+ }
- CbObjectWriter BatchResponse;
+ CbObjectWriter BatchResponse;
- BatchResponse.BeginArray("result"sv);
- for (const IoBuffer& Value : CacheValues)
- {
- if (Value)
- {
- BatchResponse << CbObjectView(Value.Data());
- }
- else
- {
- BatchResponse.AddNull();
- }
- }
- BatchResponse.EndArray();
+ BatchResponse.BeginArray("result"sv);
+ for (const IoBuffer& Value : CacheValues)
+ {
+ if (Value)
+ {
+ CbObjectView Record(Value.Data());
+ BatchResponse << Record;
+ }
+ else
+ {
+ BatchResponse.AddNull();
+ }
+ }
+ BatchResponse.EndArray();
- CbPackage Package;
- Package.SetObject(BatchResponse.Save());
+ CbPackage Package;
+ Package.SetObject(BatchResponse.Save());
- for (const IoBuffer& Payload : Payloads)
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
- }
+ for (const IoBuffer& Payload : Payloads)
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
+ }
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
+HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
+{
+ using namespace fmt::literals;
- BinaryWriter MemStream;
- Package.Save(MemStream);
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
+
+ const std::string_view Method = BatchRequest["method"sv].AsString();
+ ZEN_ASSERT(Method == "get-cache-chunks"sv);
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
+
+ const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
+ if (CbObjectView ValueObject = Record["Value"].AsObjectView())
+ {
+ const Oid Id = ValueObject["Id"].AsObjectId();
+ if (Id == PayloadId)
+ {
+ return ValueObject["RawHash"sv].AsHash();
}
- break;
- default:
- Request.WriteResponse(HttpResponseCode::BadRequest);
- break;
+ }
+
+ for (CbFieldView AttachmentView : Record["Attachments"sv])
+ {
+ CbObjectView AttachmentObject = AttachmentView.AsObjectView();
+ const Oid Id = AttachmentObject["Id"].AsObjectId();
+
+ if (Id == PayloadId)
+ {
+ return AttachmentObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ std::vector<CacheChunk> CacheChunks;
+ std::vector<CachePolicy> ChunkPolicies;
+ std::vector<size_t> Missing;
+
+ for (CbFieldView ChunkView : Params["cachechunks"sv])
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ CbObjectView CacheKeyObject = Chunk["key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(CacheKeyObject["bucket"sv].AsString(), CacheKeyObject["hash"sv].AsHash());
+ const Oid PayloadId = Chunk["id"sv].AsObjectId();
+ const uint64_t RawOffset = Chunk["rawoffset"sv].AsUInt64();
+ const uint64_t RawSize = Chunk["rawsize"sv].AsUInt64();
+ const uint32_t ChunkPolicy = Chunk["policy"sv].AsUInt32();
+
+ IoHash ChunkId = IoHash::Zero;
+
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ ChunkId = GetChunkIdFromPayloadId(CbObjectView(CacheValue.Value.Data()), PayloadId);
+ }
+
+ CacheChunks.emplace_back(Key, ChunkId, RawOffset, RawSize);
+ ChunkPolicies.emplace_back(static_cast<CachePolicy>(ChunkPolicy));
+ }
+
+ if (CacheChunks.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<IoBuffer> Chunks;
+ Chunks.resize(CacheChunks.size());
+
+ for (size_t Idx = 0; CacheChunk & CacheChunk : CacheChunks)
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(CacheChunk.Id))
+ {
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ CacheChunk.Key.Bucket,
+ CacheChunk.Key.Hash,
+ CacheChunk.Id,
+ NiceBytes(Chunk.Size()),
+ ToString(Chunk.GetContentType()),
+ "LOCAL");
+
+ Chunks[Idx] = Chunk;
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'",
+ CacheChunk.Key.Bucket,
+ CacheChunk.Key.Hash,
+ CacheChunk.Id,
+ ToString(HttpContentType::kCompressedBinary));
+
+ CacheChunk.Id = IoHash::Zero;
+ Missing.push_back(Idx);
+ }
+ ++Idx;
+ }
+
+ CbPackage Package;
+ CbObjectWriter BatchResponse;
+
+ BatchResponse.BeginArray("result"sv);
+
+ for (size_t Idx = 0; Idx < Chunks.size(); ++Idx)
+ {
+ if (Chunks[Idx])
+ {
+ BatchResponse << CacheChunks[Idx].Id;
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[Idx])))));
+ }
+ else
+ {
+ BatchResponse << IoHash::Zero;
+ }
}
+ BatchResponse.EndArray();
+
+ Package.SetObject(BatchResponse.Save());
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
void
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index d6b4944fd..fe4453c2b 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -90,6 +90,8 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy);
+ void HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy);
+ void HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;