diff options
| author | Per Larsson <[email protected]> | 2021-11-09 13:20:00 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-09 13:20:00 +0100 |
| commit | e0d54396fa3ba0f5466a4ea1f2810721c18fa55f (patch) | |
| tree | 4ab63a83dac7e1e9245d62be1918d12f4a55ae8d | |
| parent | Added batched get chunk(s). (diff) | |
| download | zen-e0d54396fa3ba0f5466a4ea1f2810721c18fa55f.tar.xz zen-e0d54396fa3ba0f5466a4ea1f2810721c18fa55f.zip | |
Sort cache keys when resolving payload ID's.
| -rw-r--r-- | zenserver/cache/cachekey.cpp | 2 | ||||
| -rw-r--r-- | zenserver/cache/cachekey.h | 64 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 156 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 2 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 48 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 4 |
6 files changed, 228 insertions, 48 deletions
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp index 94ef7fd12..57993d424 100644 --- a/zenserver/cache/cachekey.cpp +++ b/zenserver/cache/cachekey.cpp @@ -100,6 +100,6 @@ ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default) return Result; } -CacheKey CacheKey::None = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; +const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; } // namespace zen diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h index 012a01292..5b67b0261 100644 --- a/zenserver/cache/cachekey.h +++ b/zenserver/cache/cachekey.h @@ -44,15 +44,67 @@ struct CacheKey static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } - static CacheKey None; + static const CacheKey Empty; }; -struct CacheChunk +inline bool +operator==(const CacheKey& A, const CacheKey& B) { - CacheKey Key; - IoHash Id; - uint64_t RawOffset = 0ull; - uint64_t RawSize = ~uint64_t(0); + return A.Bucket == B.Bucket && A.Hash == B.Hash; +} + +inline bool +operator!=(const CacheKey& A, const CacheKey& B) +{ + return A.Bucket != B.Bucket || A.Hash != B.Hash; +} + +inline bool +operator<(const CacheKey& A, const CacheKey& B) +{ + const std::string& BucketA = A.Bucket; + const std::string& BucketB = B.Bucket; + return BucketA == BucketB ? A.Hash < B.Hash : BucketA < BucketB; +} + +struct CacheChunkRequest +{ + CacheKey Key; + IoHash ChunkId; + Oid PayloadId; + uint64_t RawOffset = 0ull; + uint64_t RawSize = ~uint64_t(0); + CachePolicy Policy = CachePolicy::Default; }; +inline bool +operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) +{ + if (A.Key < B.Key) + { + return true; + } + if (B.Key < A.Key) + { + return false; + } + if (A.ChunkId < B.ChunkId) + { + return true; + } + if (B.ChunkId < A.ChunkId) + { + return false; + } + if (A.PayloadId < B.PayloadId) + { + return true; + } + if (B.PayloadId < A.PayloadId) + { + return false; + } + return A.RawOffset < B.RawOffset; +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index a1b0e1549..decad2f04 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -809,7 +809,7 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, } else if (Method == "get-cache-chunks"sv) { - HandleBatchGetCacheChunks(Request, BatchRequest, Policy); + HandleBatchGetCachePayloads(Request, BatchRequest, Policy); } else { @@ -848,6 +848,11 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["hash"sv].AsHash())); } + if (CacheKeys.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheValues.resize(CacheKeys.size()); for (size_t Idx = 0; const CacheKey& Key : CacheKeys) @@ -867,7 +872,14 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R }); } + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(CacheValue.Value.Size()), + ToString(CacheValue.Value.GetContentType())); + CacheValues[Idx] = CacheValue.Value; + m_CacheStats.HitCount++; } else { @@ -882,8 +894,9 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R auto UpstreamResult = m_UpstreamCache->GetCacheRecords( CacheKeys, Missing, - [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { - CbPackage UpstreamPackage; + [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { + const CacheKey& Key = CacheKeys[KeyIndex]; + CbPackage UpstreamPackage; if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) { CbObjectView CacheRecord = UpstreamPackage.GetObject(); @@ -903,9 +916,25 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R } }); + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Key.Bucket, + Key.Hash, + NiceBytes(UpstreamValue.Size()), + ToString(UpstreamValue.GetContentType())); + CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); + m_CacheStats.UpstreamHitCount++; } }); + + Missing = std::move(UpstreamResult.Missing); + } + + for (size_t Idx : Missing) + { + const CacheKey& Key = CacheKeys[Idx]; + ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); + m_CacheStats.MissCount++; } CbObjectWriter BatchResponse; @@ -942,19 +971,10 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R } void -HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy) +HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy) { using namespace fmt::literals; - const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - - const std::string_view Method = BatchRequest["method"sv].AsString(); - ZEN_ASSERT(Method == "get-cache-chunks"sv); - - CbObjectView Params = BatchRequest["params"sv].AsObjectView(); - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { if (CbObjectView ValueObject = Record["Value"].AsObjectView()) { @@ -979,68 +999,126 @@ HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Re return IoHash::Zero; }; - std::vector<CacheChunk> CacheChunks; - std::vector<CachePolicy> ChunkPolicies; - std::vector<size_t> Missing; + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); + + const std::string_view Method = BatchRequest["method"sv].AsString(); + ZEN_ASSERT(Method == "get-cache-chunks"sv); + + CbObjectView Params = BatchRequest["params"sv].AsObjectView(); + + std::vector<CacheChunkRequest> ChunkRequests; + std::vector<size_t> Missing; - for (CbFieldView ChunkView : Params["cachechunks"sv]) + for (CbFieldView ChunkView : Params["chunkrequests"sv]) { CbObjectView Chunk = ChunkView.AsObjectView(); CbObjectView CacheKeyObject = Chunk["key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(CacheKeyObject["bucket"sv].AsString(), CacheKeyObject["hash"sv].AsHash()); - const Oid PayloadId = Chunk["id"sv].AsObjectId(); + const IoHash ChunkId = IoHash::Zero; + const Oid PayloadId = Chunk["payloadid"sv].AsObjectId(); const uint64_t RawOffset = Chunk["rawoffset"sv].AsUInt64(); const uint64_t RawSize = Chunk["rawsize"sv].AsUInt64(); const uint32_t ChunkPolicy = Chunk["policy"sv].AsUInt32(); - IoHash ChunkId = IoHash::Zero; + ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)); + } - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + std::stable_sort(ChunkRequests.begin(), ChunkRequests.end()); + + CacheKey CurrentKey = CacheKey::Empty; + IoBuffer CurrentRecordBuffer; + + for (CacheChunkRequest& ChunkRequest : ChunkRequests) + { + if (ChunkRequest.Key != CurrentKey) { - ChunkId = GetChunkIdFromPayloadId(CbObjectView(CacheValue.Value.Data()), PayloadId); + CurrentKey = ChunkRequest.Key; + + ZenCacheValue CacheValue; + if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue)) + { + CurrentRecordBuffer = CacheValue.Value; + } } - CacheChunks.emplace_back(Key, ChunkId, RawOffset, RawSize); - ChunkPolicies.emplace_back(static_cast<CachePolicy>(ChunkPolicy)); + if (CurrentRecordBuffer) + { + ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + } } - if (CacheChunks.empty()) + if (ChunkRequests.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } std::vector<IoBuffer> Chunks; - Chunks.resize(CacheChunks.size()); + Chunks.resize(ChunkRequests.size()); - for (size_t Idx = 0; CacheChunk & CacheChunk : CacheChunks) + for (size_t Idx = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(CacheChunk.Id)) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) { ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", - CacheChunk.Key.Bucket, - CacheChunk.Key.Hash, - CacheChunk.Id, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + ChunkRequest.ChunkId, NiceBytes(Chunk.Size()), ToString(Chunk.GetContentType()), "LOCAL"); Chunks[Idx] = Chunk; + m_CacheStats.HitCount++; } else { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", - CacheChunk.Key.Bucket, - CacheChunk.Key.Hash, - CacheChunk.Id, - ToString(HttpContentType::kCompressedBinary)); - - CacheChunk.Id = IoHash::Zero; Missing.push_back(Idx); } ++Idx; } + if (!Missing.empty() && QueryUpstream) + { + auto UpstreamResult = m_UpstreamCache->GetCachePayloads( + ChunkRequests, + Missing, + [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) { + const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex]; + + if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue))) + { + auto InsertResult = m_CidStore.AddChunk(CompressedChunk); + IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer(); + + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + ChunkRequest.ChunkId, + NiceBytes(Chunk.GetSize()), + ToString(Chunk.GetContentType()), + "UPSTREAM"); + + Chunks[ChunkIndex] = Chunk; + m_CacheStats.UpstreamHitCount++; + } + else + { + ZEN_WARN("got uncompressed upstream cache payload"); + } + }); + + Missing = std::move(UpstreamResult.Missing); + } + + for (size_t Idx : Missing) + { + const CacheChunkRequest& ChunkRequest = ChunkRequests[Idx]; + ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + m_CacheStats.MissCount++; + } + CbPackage Package; CbObjectWriter BatchResponse; @@ -1050,7 +1128,7 @@ HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Re { if (Chunks[Idx]) { - BatchResponse << CacheChunks[Idx].Id; + BatchResponse << ChunkRequests[Idx].ChunkId; Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[Idx]))))); } else diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index fe4453c2b..1ff4f28c9 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -91,7 +91,7 @@ private: void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy); void HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy); - void HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy); + void HandleBatchGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 437b29cd7..7ef0caf62 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -782,7 +782,7 @@ public: virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, std::span<size_t> KeyIndex, - OnCacheGetComplete OnComplete) override + OnCacheGetComplete OnComplete) override final { if (!m_Options.ReadUpstream) { @@ -801,6 +801,52 @@ public: if (Endpoint->IsHealthy()) { CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); + m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + + if (CacheResult.Success) + { + break; + } + } + } + + if (CacheResult.Success) + { + OnComplete(Idx, CacheResult.Value); + } + else + { + Result.Missing.push_back(Idx); + } + } + + return Result; + } + + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> ChunkIndex, + OnCacheGetComplete OnComplete) override final + { + if (!m_Options.ReadUpstream) + { + return {.Missing = std::vector<size_t>(ChunkIndex.begin(), ChunkIndex.end())}; + } + + GetUpstreamCacheBatchResult Result; + + for (size_t Idx : ChunkIndex) + { + const CacheChunkRequest& Chunk = CacheChunkRequests[Idx]; + UpstreamPayloadKey PayloadKey{{Chunk.Key.Bucket, Chunk.Key.Hash}, Chunk.ChunkId}; + + GetUpstreamCacheResult CacheResult; + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + CacheResult = Endpoint->GetCachePayload(PayloadKey); + m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + if (CacheResult.Success) { break; diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 04554f210..a7bae302d 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -145,6 +145,10 @@ public: virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheGetComplete OnComplete) = 0; + struct EnqueueResult { bool Success = false; |