diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 490 |
1 files changed, 378 insertions, 112 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 35cb02cbb..10fbb3709 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcache.h" #include "structuredcachestore.h" @@ -36,111 +37,12 @@ using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -namespace detail { namespace cacheopt { - constexpr std::string_view Local = "local"sv; - constexpr std::string_view Remote = "remote"sv; - constexpr std::string_view Data = "data"sv; - constexpr std::string_view Meta = "meta"sv; - constexpr std::string_view Value = "value"sv; - constexpr std::string_view Attachments = "attachments"sv; -}} // namespace detail::cacheopt - -////////////////////////////////////////////////////////////////////////// - -enum class CachePolicy : uint8_t -{ - None = 0, - QueryLocal = 1 << 0, - QueryRemote = 1 << 1, - Query = QueryLocal | QueryRemote, - StoreLocal = 1 << 2, - StoreRemote = 1 << 3, - Store = StoreLocal | StoreRemote, - SkipMeta = 1 << 4, - SkipValue = 1 << 5, - SkipAttachments = 1 << 6, - SkipData = SkipMeta | SkipValue | SkipAttachments, - SkipLocalCopy = 1 << 7, - Local = QueryLocal | StoreLocal, - Remote = QueryRemote | StoreRemote, - Default = Query | Store, - Disable = None, -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); - CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - CachePolicy QueryPolicy = CachePolicy::Query; - - { - std::string_view Opts = QueryParams.GetValue("query"sv); - if (!Opts.empty()) - { - QueryPolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - QueryPolicy |= CachePolicy::QueryLocal; - } - if (Opt == detail::cacheopt::Remote) - { - QueryPolicy |= CachePolicy::QueryRemote; - } - return true; - }); - } - } - - CachePolicy StorePolicy = CachePolicy::Store; - - { - std::string_view Opts = QueryParams.GetValue("store"sv); - if (!Opts.empty()) - { - StorePolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - StorePolicy |= CachePolicy::StoreLocal; - } - if (Opt == detail::cacheopt::Remote) - { - StorePolicy |= CachePolicy::StoreRemote; - } - return true; - }); - } - } - - CachePolicy SkipPolicy = CachePolicy::None; - - { - std::string_view Opts = QueryParams.GetValue("skip"sv); - if (!Opts.empty()) - { - ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Meta) - { - SkipPolicy |= CachePolicy::SkipMeta; - } - if (Opt == detail::cacheopt::Value) - { - SkipPolicy |= CachePolicy::SkipValue; - } - if (Opt == detail::cacheopt::Attachments) - { - SkipPolicy |= CachePolicy::SkipAttachments; - } - if (Opt == detail::cacheopt::Data) - { - SkipPolicy |= CachePolicy::SkipData; - } - return true; - }); - } - } + const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); + const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); + const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); return QueryPolicy | StorePolicy | SkipPolicy; } @@ -210,6 +112,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); + if (Key == "$batch") + { + return HandleBatchRequest(Request); + } + if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -322,8 +229,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidCount != AttachmentCount) { - Success = false; - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + // Success = false; + ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), @@ -645,12 +552,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } }); - const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); + const bool IsPartialCacheRecord = ValidAttachments.size() != Attachments.size(); - if (!AttachmentsValid) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); - } + // if (!AttachmentsValid) + //{ + // return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + //} ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments", Ref.BucketSegment, @@ -665,7 +572,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (StoreUpstream) + if (StoreUpstream && !IsPartialCacheRecord) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, @@ -711,8 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -873,6 +779,366 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } void +HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request) +{ + switch (auto Verb = Request.RequestVerb()) + { + using enum HttpVerb; + + case kPost: + { + const HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType AcceptType = Request.AcceptContentType(); + + if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Request.WriteResponseAsync( + [this, BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { + const std::string_view Method = BatchRequest["Method"sv].AsString(); + if (Method == "GetCacheRecords"sv) + { + HandleBatchGetCacheRecords(AsyncRequest, BatchRequest); + } + else if (Method == "GetCachePayloads"sv) + { + HandleBatchGetCachePayloads(AsyncRequest, BatchRequest); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); + } + break; + default: + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} + +void +HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest) +{ + using namespace fmt::literals; + + CbPackage BatchResponse; + CacheRecordPolicy Policy; + CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + std::vector<CacheKey> CacheKeys; + std::vector<IoBuffer> CacheValues; + std::vector<size_t> UpstreamRequests; + + ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCacheRecords"sv); + + CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); + + const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); + + for (CbFieldView KeyView : Params["CacheKeys"sv]) + { + CbObjectView KeyObject = KeyView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); + } + + if (CacheKeys.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CacheValues.resize(CacheKeys.size()); + + for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + CbObjectView CacheRecord(CacheValue.Value.Data()); + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &BatchResponse](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + }); + } + + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(CacheValue.Value.Size()), + ToString(CacheValue.Value.GetContentType())); + + CacheValues[KeyIndex] = CacheValue.Value; + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(KeyIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); + m_CacheStats.MissCount++; + } + + ++KeyIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCacheRecordGetComplete = + [this, &CacheKeys, &CacheValues, &BatchResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Compressed); + + if (!SkipAttachments) + { + BatchResponse.AddAttachment(CbAttachment(Compressed)); + } + } + } + }); + + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Params.CacheKey.Bucket, + Params.CacheKey.Hash, + NiceBytes(Params.Record.GetView().GetSize()), + ToString(HttpContentType::kCbObject)); + + CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView()); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + ZEN_DEBUG("MISS - '{}/{}'", Params.CacheKey.Bucket, Params.CacheKey.Hash); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + } + + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + for (const IoBuffer& Value : CacheValues) + { + if (Value) + { + CbObjectView Record(Value.Data()); + ResponseObject << Record; + } + else + { + ResponseObject.AddNull(); + } + } + ResponseObject.EndArray(); + + BatchResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + BatchResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void +HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest) +{ + using namespace fmt::literals; + + ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCachePayloads"sv); + + std::vector<CacheChunkRequest> ChunkRequests; + std::vector<size_t> UpstreamRequests; + std::vector<IoBuffer> Chunks; + CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + + for (CbFieldView RequestView : Params["ChunkRequests"sv]) + { + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = IoHash::Zero; + const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawoffSet"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); + + ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)); + } + + if (ChunkRequests.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Chunks.resize(ChunkRequests.size()); + + // Try to find the uncompressed raw hash from the payload ID. + { + const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { + if (CbObjectView ValueObject = Record["Value"].AsObjectView()) + { + const Oid Id = ValueObject["Id"].AsObjectId(); + if (Id == PayloadId) + { + return ValueObject["RawHash"sv].AsHash(); + } + } + + for (CbFieldView AttachmentView : Record["Attachments"sv]) + { + CbObjectView AttachmentObject = AttachmentView.AsObjectView(); + const Oid Id = AttachmentObject["Id"].AsObjectId(); + + if (Id == PayloadId) + { + return AttachmentObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + CacheKey CurrentKey = CacheKey::Empty; + IoBuffer CurrentRecordBuffer; + + std::stable_sort(ChunkRequests.begin(), ChunkRequests.end()); + + for (CacheChunkRequest& ChunkRequest : ChunkRequests) + { + if (ChunkRequest.Key != CurrentKey) + { + CurrentKey = ChunkRequest.Key; + + ZenCacheValue CacheValue; + if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue)) + { + CurrentRecordBuffer = CacheValue.Value; + } + } + + if (CurrentRecordBuffer) + { + ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + } + } + } + + for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) + { + const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal; + const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + + if (QueryLocal) + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) + { + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + ChunkRequest.ChunkId, + NiceBytes(Chunk.Size()), + ToString(Chunk.GetContentType()), + "LOCAL"); + + Chunks[RequestIndex] = Chunk; + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(RequestIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + m_CacheStats.MissCount++; + } + } + else + { + ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + } + + ++RequestIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + + ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", + Params.Request.Key.Bucket, + Params.Request.Key.Hash, + Params.Request.ChunkId, + NiceBytes(Params.Payload.GetSize()), + "UPSTREAM"); + + Chunks[Params.RequestIndex] = std::move(Params.Payload); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + } + + CbPackage BatchResponse; + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + + for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) + { + if (Chunks[ChunkIndex]) + { + ResponseObject << ChunkRequests[ChunkIndex].ChunkId; + BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + } + else + { + ResponseObject << IoHash::Zero; + } + } + ResponseObject.EndArray(); + + BatchResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + BatchResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; |