aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-11-18 14:33:44 +0100
committerStefan Boberg <[email protected]>2021-11-18 14:33:44 +0100
commite53df312f3c4dcef19add9cd26afc324557b1f5a (patch)
treea3d7b59f29e484d48edffb2a26bbb0dd2d95533d /zenserver/cache
parentgc: implemented timestamped snapshot persistence (diff)
parentChange error code for failed upsteam apply (diff)
downloadzen-e53df312f3c4dcef19add9cd26afc324557b1f5a.tar.xz
zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.zip
merge from main
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/structuredcache.cpp642
-rw-r--r--zenserver/cache/structuredcache.h5
-rw-r--r--zenserver/cache/structuredcachestore.cpp66
-rw-r--r--zenserver/cache/structuredcachestore.h85
4 files changed, 604 insertions, 194 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 177cdbf55..53e1b1c61 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -14,7 +14,9 @@
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include <zenutil/cache/cache.h>
+//#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -36,115 +38,24 @@ using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
-namespace detail { namespace cacheopt {
- constexpr std::string_view Local = "local"sv;
- constexpr std::string_view Remote = "remote"sv;
- constexpr std::string_view Data = "data"sv;
- constexpr std::string_view Meta = "meta"sv;
- constexpr std::string_view Value = "value"sv;
- constexpr std::string_view Attachments = "attachments"sv;
-}} // namespace detail::cacheopt
-
-//////////////////////////////////////////////////////////////////////////
-
-enum class CachePolicy : uint8_t
-{
- None = 0,
- QueryLocal = 1 << 0,
- QueryRemote = 1 << 1,
- Query = QueryLocal | QueryRemote,
- StoreLocal = 1 << 2,
- StoreRemote = 1 << 3,
- Store = StoreLocal | StoreRemote,
- SkipMeta = 1 << 4,
- SkipValue = 1 << 5,
- SkipAttachments = 1 << 6,
- SkipData = SkipMeta | SkipValue | SkipAttachments,
- SkipLocalCopy = 1 << 7,
- Local = QueryLocal | StoreLocal,
- Remote = QueryRemote | StoreRemote,
- Default = Query | Store,
- Disable = None,
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
-
CachePolicy
ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
{
- CachePolicy QueryPolicy = CachePolicy::Query;
-
- {
- std::string_view Opts = QueryParams.GetValue("query"sv);
- if (!Opts.empty())
- {
- QueryPolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- QueryPolicy |= CachePolicy::QueryLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- QueryPolicy |= CachePolicy::QueryRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy StorePolicy = CachePolicy::Store;
-
- {
- std::string_view Opts = QueryParams.GetValue("store"sv);
- if (!Opts.empty())
- {
- StorePolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- StorePolicy |= CachePolicy::StoreLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- StorePolicy |= CachePolicy::StoreRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy SkipPolicy = CachePolicy::None;
-
- {
- std::string_view Opts = QueryParams.GetValue("skip"sv);
- if (!Opts.empty())
- {
- ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Meta)
- {
- SkipPolicy |= CachePolicy::SkipMeta;
- }
- if (Opt == detail::cacheopt::Value)
- {
- SkipPolicy |= CachePolicy::SkipValue;
- }
- if (Opt == detail::cacheopt::Attachments)
- {
- SkipPolicy |= CachePolicy::SkipAttachments;
- }
- if (Opt == detail::cacheopt::Data)
- {
- SkipPolicy |= CachePolicy::SkipData;
- }
- return true;
- });
- }
- }
+ const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv));
+ const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv));
+ const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv));
return QueryPolicy | StorePolicy | SkipPolicy;
}
+struct AttachmentCount
+{
+ uint32_t New = 0;
+ uint32_t Valid = 0;
+ uint32_t Invalid = 0;
+ uint32_t Total = 0;
+};
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -207,6 +118,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
+ if (Key == "$rpc")
+ {
+ return HandleRpcRequest(Request);
+ }
+
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -319,8 +235,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (ValidCount != AttachmentCount)
{
- Success = false;
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ // Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments",
Ref.BucketSegment,
Ref.HashKey,
ToString(AcceptType),
@@ -543,52 +459,39 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
- uint32_t AttachmentCount = 0;
+ int32_t TotalCount = 0;
- CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) {
const IoHash Hash = AttachmentHash.AsHash();
if (m_CidStore.ContainsChunk(Hash))
{
ValidAttachments.emplace_back(Hash);
}
- AttachmentCount++;
+ TotalCount++;
});
- const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size());
- const bool ValidCacheRecord = ValidCount == AttachmentCount;
-
- if (ValidCacheRecord)
- {
- ZEN_DEBUG("PUT - '{}/{}' {} '{}', {} attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- ValidCount);
+ ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.Size()),
+ ToString(ContentType),
+ TotalCount,
+ ValidAttachments.size());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ Body.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- if (StoreUpstream)
- {
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(ValidAttachments)});
- }
+ const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else
+ if (StoreUpstream && !IsPartialRecord)
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, found {}/{} attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType),
- ValidCount,
- AttachmentCount);
-
- Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv);
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
}
+
+ Request.WriteResponse(HttpResponseCode::Created);
}
else if (ContentType == HttpContentType::kCbPackage)
{
@@ -600,16 +503,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
}
- CbObject CacheRecord = Package.GetObject();
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- std::vector<IoHash> ValidAttachments;
- int32_t NewAttachmentCount = 0;
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ std::vector<IoHash> ValidAttachments;
- ValidAttachments.reserve(Attachments.size());
+ ValidAttachments.reserve(Package.GetAttachments().size());
- CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) {
+ const IoHash Hash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
@@ -620,8 +522,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (InsertResult.New)
{
- NewAttachmentCount++;
+ Count.New++;
}
+ Count.Valid++;
}
else
{
@@ -629,40 +532,40 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
Ref.BucketSegment,
Ref.HashKey,
ToString(HttpContentType::kCbPackage),
- AttachmentHash.AsHash());
+ Hash);
+ Count.Invalid++;
}
}
- else
+ else if (m_CidStore.ContainsChunk(Hash))
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, missing attachment '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- AttachmentHash.AsHash());
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
}
+ Count.Total++;
});
- const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
-
- if (!AttachmentsValid)
+ if (Count.Invalid > 0)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
- ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments",
+ ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(Body.GetSize()),
ToString(ContentType),
- NewAttachmentCount,
- Attachments.size());
+ Count.New,
+ Count.Valid,
+ Count.Total);
IoBuffer CacheRecordValue = CacheRecord.GetBuffer().AsIoBuffer();
CacheRecordValue.SetContentType(ZenContentType::kCbObject);
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (StoreUpstream)
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (StoreUpstream && !IsPartialRecord)
{
ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
@@ -708,8 +611,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
- UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -864,6 +766,420 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
+HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
+{
+ switch (auto Verb = Request.RequestVerb())
+ {
+ using enum HttpVerb;
+
+ case kPost:
+ {
+ const HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType AcceptType = Request.AcceptContentType();
+
+ if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ Request.WriteResponseAsync(
+ [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
+ const std::string_view Method = RpcRequest["Method"sv].AsString();
+ if (Method == "GetCacheRecords"sv)
+ {
+ HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
+ }
+ else if (Method == "GetCachePayloads"sv)
+ {
+ HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
+ }
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ using namespace fmt::literals;
+
+ CbPackage RpcResponse;
+ CacheRecordPolicy Policy;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<size_t> UpstreamRequests;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+
+ CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
+
+ const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError);
+ const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments);
+ const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache;
+
+ for (CbFieldView KeyView : Params["CacheKeys"sv])
+ {
+ CbObjectView KeyObject = KeyView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()));
+ }
+
+ if (CacheKeys.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CacheValues.resize(CacheKeys.size());
+
+ for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
+ {
+ ZenCacheValue CacheValue;
+ uint32_t MissingCount = 0;
+
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ CbObjectView CacheRecord(CacheValue.Value.Data());
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ MissingCount++;
+ }
+ });
+ }
+ }
+
+ if (CacheValue.Value && (MissingCount == 0 || PartialOnError))
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(CacheValue.Value.Size()),
+ ToString(CacheValue.Value.GetContentType()));
+
+ CacheValues[KeyIndex] = std::move(CacheValue.Value);
+ m_CacheStats.HitCount++;
+ }
+ else if (QueryRemote)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(partial)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
+
+ ++KeyIndex;
+ }
+
+ if (!UpstreamRequests.empty() && m_UpstreamCache)
+ {
+ const auto OnCacheRecordGetComplete =
+ [this, &CacheKeys, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
+
+ IoBuffer CacheValue;
+ AttachmentCount Count;
+
+ if (Params.Record)
+ {
+ Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+
+ if (!SkipAttachments)
+ {
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'",
+ HashView.AsHash(),
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(HashView.AsHash()))
+ {
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if ((Count.Valid == Count.Total) || PartialOnError)
+ {
+ CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
+ }
+ }
+
+ if (CacheValue)
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)",
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash,
+ NiceBytes(CacheValue.GetSize()),
+ ToString(HttpContentType::kCbPackage),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ CacheValue.SetContentType(ZenContentType::kCbObject);
+
+ CacheValues[Params.KeyIndex] = CacheValue;
+ m_CacheStore.Put(Params.CacheKey.Bucket, Params.CacheKey.Hash, {.Value = CacheValue});
+
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ const bool IsPartial = Count.Valid != Count.Total;
+ ZEN_DEBUG("MISS - '{}/{}' {}", Params.CacheKey.Bucket, Params.CacheKey.Hash, IsPartial ? "(partial)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
+ };
+
+ m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ }
+
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+ for (const IoBuffer& Value : CacheValues)
+ {
+ if (Value)
+ {
+ CbObjectView Record(Value.Data());
+ ResponseObject << Record;
+ }
+ else
+ {
+ ResponseObject.AddNull();
+ }
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
+HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ using namespace fmt::literals;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv);
+
+ std::vector<CacheChunkRequest> ChunkRequests;
+ std::vector<size_t> UpstreamRequests;
+ std::vector<IoBuffer> Chunks;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+
+ for (CbFieldView RequestView : Params["ChunkRequests"sv])
+ {
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
+ const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId();
+ const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32();
+
+ ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy));
+ }
+
+ if (ChunkRequests.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ Chunks.resize(ChunkRequests.size());
+
+ // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId)
+ // is missing, load the cache record and try to find the raw hash from the payload ID.
+ {
+ const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
+ if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
+ {
+ const Oid Id = ValueObject["Id"sv].AsObjectId();
+ if (Id == PayloadId)
+ {
+ return ValueObject["RawHash"sv].AsHash();
+ }
+ }
+
+ for (CbFieldView AttachmentView : Record["Attachments"sv])
+ {
+ CbObjectView AttachmentObject = AttachmentView.AsObjectView();
+ const Oid Id = AttachmentObject["Id"sv].AsObjectId();
+
+ if (Id == PayloadId)
+ {
+ return AttachmentObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ CacheKey CurrentKey = CacheKey::Empty;
+ IoBuffer CurrentRecordBuffer;
+
+ for (CacheChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ if (ChunkRequest.ChunkId != IoHash::Zero)
+ {
+ continue;
+ }
+
+ if (ChunkRequest.Key != CurrentKey)
+ {
+ CurrentKey = ChunkRequest.Key;
+
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue))
+ {
+ CurrentRecordBuffer = CacheValue.Value;
+ }
+ }
+
+ if (CurrentRecordBuffer)
+ {
+ ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ }
+ }
+ }
+
+ for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal;
+ const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
+
+ if (QueryLocal)
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId))
+ {
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ ChunkRequest.ChunkId,
+ NiceBytes(Chunk.Size()),
+ ToString(Chunk.GetContentType()),
+ "LOCAL");
+
+ Chunks[RequestIndex] = Chunk;
+ m_CacheStats.HitCount++;
+ }
+ else if (QueryRemote)
+ {
+ UpstreamRequests.push_back(RequestIndex);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ m_CacheStats.MissCount++;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ }
+
+ ++RequestIndex;
+ }
+
+ if (!UpstreamRequests.empty() && m_UpstreamCache)
+ {
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})",
+ Params.Request.Key.Bucket,
+ Params.Request.Key.Hash,
+ Params.Request.ChunkId,
+ NiceBytes(Params.Payload.GetSize()),
+ "UPSTREAM");
+
+ ZEN_ASSERT(Params.RequestIndex < Chunks.size());
+ Chunks[Params.RequestIndex] = std::move(Params.Payload);
+
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId);
+ m_CacheStats.MissCount++;
+ }
+ };
+
+ m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ }
+
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+
+ for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex)
+ {
+ if (Chunks[ChunkIndex])
+ {
+ ResponseObject << ChunkRequests[ChunkIndex].ChunkId;
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
+ }
+ else
+ {
+ ResponseObject << IoHash::Zero;
+ }
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 59749a024..a0215aa6e 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -21,7 +21,7 @@ class CidStore;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
-enum class CachePolicy : uint8_t;
+enum class CachePolicy : uint32_t;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -89,6 +89,9 @@ private:
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleRpcRequest(zen::HttpServerRequest& Request);
+ void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index ac9f628d3..5cce7f325 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -190,7 +190,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
return false;
}
- CacheBucket* Bucket = Bucket = &it->second;
+ CacheBucket* Bucket = &it->second;
_.ReleaseNow();
@@ -347,44 +347,52 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
//////////////////////////////////////////////////////////////////////////
-#pragma pack(push)
-#pragma pack(1)
+inline DiskLocation::DiskLocation() = default;
-struct DiskLocation
+inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+, LowerSize(ValueSize & 0xFFFFffff)
+, IndexDataSize(IndexSize)
{
- inline DiskLocation() = default;
+}
- inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
+inline uint64_t
+DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags)
+{
+ return Offset | Flags;
+}
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+inline uint64_t
+DiskLocation::Offset() const
+{
+ return OffsetAndFlags & kOffsetMask;
+}
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+inline uint64_t
+DiskLocation::Size() const
+{
+ return LowerSize;
+}
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
+inline uint64_t
+DiskLocation::IsFlagSet(uint64_t Flag) const
+{
+ return OffsetAndFlags & Flag;
+}
- if (IsFlagSet(DiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
+inline ZenContentType
+DiskLocation::GetContentType() const
+{
+ ZenContentType ContentType = ZenContentType::kBinary;
- return ContentType;
+ if (IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
}
+ return ContentType;
+}
+
private:
uint64_t OffsetAndFlags = 0;
uint32_t LowerSize = 0;
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 5a3191cc5..3040640f8 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -7,6 +7,7 @@
#include <zencore/iohash.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/gc.h>
@@ -103,6 +104,42 @@ private:
Configuration m_Configuration;
};
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+
+ DiskLocation();
+ DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags);
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags);
+ uint64_t Offset() const;
+ uint64_t Size() const;
+ uint64_t IsFlagSet(uint64_t Flag) const;
+ ZenContentType GetContentType() const;
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+};
+
+struct DiskIndexEntry
+{
+ IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 36);
+
class ZenCacheDiskLayer
{
public:
@@ -122,7 +159,53 @@ private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket;
+ struct CacheBucket
+ {
+ CacheBucket();
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ static bool Delete(std::filesystem::path BucketDir);
+
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ inline bool IsOk() const { return m_IsOk; }
+
+ private:
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_IsOk = false;
+ uint64_t m_LargeObjectThreshold = 64 * 1024;
+
+ // These files are used to manage storage of small objects for this bucket
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
+ };
std::filesystem::path m_RootDir;
RwLock m_Lock;