aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp353
1 files changed, 249 insertions, 104 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 59cc74d53..a1b0e1549 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -231,8 +231,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (ValidCount != AttachmentCount)
{
- Success = false;
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ // Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments",
Ref.BucketSegment,
Ref.HashKey,
ToString(AcceptType),
@@ -554,12 +554,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
});
- const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
+ const bool IsPartialCacheRecord = ValidAttachments.size() != Attachments.size();
- if (!AttachmentsValid)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
- }
+ // if (!AttachmentsValid)
+ //{
+ // return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ //}
ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments",
Ref.BucketSegment,
@@ -574,7 +574,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (StoreUpstream)
+ if (StoreUpstream && !IsPartialCacheRecord)
{
ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
@@ -786,15 +786,11 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
{
using namespace fmt::literals;
- const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
- const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
- const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
-
switch (auto Verb = Request.RequestVerb())
{
using enum HttpVerb;
- case kGet:
+ case kPost:
{
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
@@ -807,122 +803,271 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
const std::string_view Method = BatchRequest["method"sv].AsString();
- if (Method != "getcacherecords"sv)
+ if (Method == "get-cache-records"sv)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Invalid method '{}'"_format(Method));
+ HandleBatchGetCacheRecords(Request, BatchRequest, Policy);
}
+ else if (Method == "get-cache-chunks"sv)
+ {
+ HandleBatchGetCacheChunks(Request, BatchRequest, Policy);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
- CbObjectView Params = BatchRequest["params"sv].AsObjectView();
+void
+HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
+{
+ using namespace fmt::literals;
- std::vector<CacheKey> CacheKeys;
- std::vector<IoBuffer> CacheValues;
- std::vector<IoBuffer> Payloads;
- std::vector<size_t> Missing;
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
- for (CbFieldView QueryView : Params["cachekeys"sv])
- {
- CbObjectView Query = QueryView.AsObjectView();
- CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["key"sv].AsHash()));
- }
+ const std::string_view Method = BatchRequest["method"sv].AsString();
+ ZEN_ASSERT(Method == "get-cache-records"sv);
- CacheValues.resize(CacheKeys.size());
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
- for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
- {
- CbObjectView CacheRecord(CacheValue.Value.Data());
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<IoBuffer> Payloads;
+ std::vector<size_t> Missing;
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Payloads.push_back(Chunk);
- }
- });
- }
+ for (CbFieldView QueryView : Params["cachekeys"sv])
+ {
+ CbObjectView Query = QueryView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["hash"sv].AsHash()));
+ }
- CacheValues[Idx] = CacheValue.Value;
- }
- else
+ CacheValues.resize(CacheKeys.size());
+
+ for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ CbObjectView CacheRecord(CacheValue.Value.Data());
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Missing.push_back(Idx);
+ Payloads.push_back(Chunk);
}
+ });
+ }
- ++Idx;
- }
+ CacheValues[Idx] = CacheValue.Value;
+ }
+ else
+ {
+ Missing.push_back(Idx);
+ }
+
+ ++Idx;
+ }
- if (QueryUpstream)
+ if (!Missing.empty() && QueryUpstream)
+ {
+ auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
+ CacheKeys,
+ Missing,
+ [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
+ CbPackage UpstreamPackage;
+ if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
{
- auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
- CacheKeys,
- Missing,
- [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- CbPackage UpstreamPackage;
- if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
+ CbObjectView CacheRecord = UpstreamPackage.GetObject();
+
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
{
- CbObjectView CacheRecord = UpstreamPackage.GetObject();
-
- CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
- {
- if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
- {
- m_CidStore.AddChunk(Chunk);
-
- if (!SkipAttachments)
- {
- Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- }
- }
- }
- });
-
- CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ m_CidStore.AddChunk(Chunk);
+
+ if (!SkipAttachments)
+ {
+ Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ }
}
- });
+ }
+ });
+
+ CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
}
+ });
+ }
- CbObjectWriter BatchResponse;
+ CbObjectWriter BatchResponse;
- BatchResponse.BeginArray("result"sv);
- for (const IoBuffer& Value : CacheValues)
- {
- if (Value)
- {
- BatchResponse << CbObjectView(Value.Data());
- }
- else
- {
- BatchResponse.AddNull();
- }
- }
- BatchResponse.EndArray();
+ BatchResponse.BeginArray("result"sv);
+ for (const IoBuffer& Value : CacheValues)
+ {
+ if (Value)
+ {
+ CbObjectView Record(Value.Data());
+ BatchResponse << Record;
+ }
+ else
+ {
+ BatchResponse.AddNull();
+ }
+ }
+ BatchResponse.EndArray();
- CbPackage Package;
- Package.SetObject(BatchResponse.Save());
+ CbPackage Package;
+ Package.SetObject(BatchResponse.Save());
- for (const IoBuffer& Payload : Payloads)
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
- }
+ for (const IoBuffer& Payload : Payloads)
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
+ }
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
+HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
+{
+ using namespace fmt::literals;
- BinaryWriter MemStream;
- Package.Save(MemStream);
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
+
+ const std::string_view Method = BatchRequest["method"sv].AsString();
+ ZEN_ASSERT(Method == "get-cache-chunks"sv);
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
+
+ const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
+ if (CbObjectView ValueObject = Record["Value"].AsObjectView())
+ {
+ const Oid Id = ValueObject["Id"].AsObjectId();
+ if (Id == PayloadId)
+ {
+ return ValueObject["RawHash"sv].AsHash();
}
- break;
- default:
- Request.WriteResponse(HttpResponseCode::BadRequest);
- break;
+ }
+
+ for (CbFieldView AttachmentView : Record["Attachments"sv])
+ {
+ CbObjectView AttachmentObject = AttachmentView.AsObjectView();
+ const Oid Id = AttachmentObject["Id"].AsObjectId();
+
+ if (Id == PayloadId)
+ {
+ return AttachmentObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ std::vector<CacheChunk> CacheChunks;
+ std::vector<CachePolicy> ChunkPolicies;
+ std::vector<size_t> Missing;
+
+ for (CbFieldView ChunkView : Params["cachechunks"sv])
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ CbObjectView CacheKeyObject = Chunk["key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(CacheKeyObject["bucket"sv].AsString(), CacheKeyObject["hash"sv].AsHash());
+ const Oid PayloadId = Chunk["id"sv].AsObjectId();
+ const uint64_t RawOffset = Chunk["rawoffset"sv].AsUInt64();
+ const uint64_t RawSize = Chunk["rawsize"sv].AsUInt64();
+ const uint32_t ChunkPolicy = Chunk["policy"sv].AsUInt32();
+
+ IoHash ChunkId = IoHash::Zero;
+
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ ChunkId = GetChunkIdFromPayloadId(CbObjectView(CacheValue.Value.Data()), PayloadId);
+ }
+
+ CacheChunks.emplace_back(Key, ChunkId, RawOffset, RawSize);
+ ChunkPolicies.emplace_back(static_cast<CachePolicy>(ChunkPolicy));
+ }
+
+ if (CacheChunks.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<IoBuffer> Chunks;
+ Chunks.resize(CacheChunks.size());
+
+ for (size_t Idx = 0; CacheChunk & CacheChunk : CacheChunks)
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(CacheChunk.Id))
+ {
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ CacheChunk.Key.Bucket,
+ CacheChunk.Key.Hash,
+ CacheChunk.Id,
+ NiceBytes(Chunk.Size()),
+ ToString(Chunk.GetContentType()),
+ "LOCAL");
+
+ Chunks[Idx] = Chunk;
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'",
+ CacheChunk.Key.Bucket,
+ CacheChunk.Key.Hash,
+ CacheChunk.Id,
+ ToString(HttpContentType::kCompressedBinary));
+
+ CacheChunk.Id = IoHash::Zero;
+ Missing.push_back(Idx);
+ }
+ ++Idx;
+ }
+
+ CbPackage Package;
+ CbObjectWriter BatchResponse;
+
+ BatchResponse.BeginArray("result"sv);
+
+ for (size_t Idx = 0; Idx < Chunks.size(); ++Idx)
+ {
+ if (Chunks[Idx])
+ {
+ BatchResponse << CacheChunks[Idx].Id;
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[Idx])))));
+ }
+ else
+ {
+ BatchResponse << IoHash::Zero;
+ }
}
+ BatchResponse.EndArray();
+
+ Package.SetObject(BatchResponse.Save());
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
void