diff options
| author | Dan Engelbrecht <[email protected]> | 2022-12-07 11:21:41 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-12-07 02:21:41 -0800 |
| commit | 100c8f966b1c5b2fb190748f0177600562d1c5fe (patch) | |
| tree | fc85e350dea47330149a1d42eb7a6c7ae0a06111 /zenserver/cache/structuredcache.cpp | |
| parent | Cache request record/replay (#198) (diff) | |
| download | zen-100c8f966b1c5b2fb190748f0177600562d1c5fe.tar.xz zen-100c8f966b1c5b2fb190748f0177600562d1c5fe.zip | |
optimizations (#200)
* Use direct file read and direct buffer allocation for small IoBuffer materalization
* Reduce range of materialized data in CompositeBuffer reading
CompressedBuffer header reading often only need a small part and not the whole file
* reduce lock contention in IoBuffer::Materialize
* Reduce parsing of compressed headers
Validate header type at decompression
* faster CreateDirectories - start from leaf going up and recurse back
* optimized BufferHeader::IsValid
* Add ValidateCompressedHeader to use when we don't need the actual compressed data
Validate that we always get compressed data in CidStore::AddChunk
* changelog
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 121 |
1 files changed, 66 insertions, 55 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index baaf94dd0..f649efa01 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash())); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); } else { @@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) { Package.AddAttachment(CbAttachment(Compressed, Hash)); @@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request, m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Status.Success) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) { - m_CidStore.AddChunk(Compressed); - Source = UpstreamResult.Source; + if (RawHash == Ref.ValueContentId) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } } else { @@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request, Body.SetContentType(Request.RequestContentType()); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - - if (!Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) + if (RawHash != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); Value.Exists = true; } else @@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Value.Exists = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash()))); + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); } } @@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - CompressedBuffer& Result = Request.Result; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { - Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize); } } - if (Result) + if (Request.Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, Key.Bucket, Key.Hash, - NiceBytes(Result.GetCompressed().GetSize()), + NiceBytes(Request.Result.GetCompressed().GetSize()), "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; @@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) { if (HasData && !SkipData) { - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize); } if (HasData && StoreData) @@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) const CompressedBuffer& Result = Request.Result; if (Result) { - IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash()); - ResponseObject.AddHash("RawHash"sv, Hash); + ResponseObject.AddHash("RawHash"sv, Request.RawHash); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Result, Hash)); + RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); } else { - ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); } } else if (Request.RawHash != IoHash::Zero) @@ -2310,12 +2318,12 @@ namespace cache::detail { RecordBody* Record = nullptr; CompressedBuffer Value; const UpstreamEndpointInfo* Source = nullptr; - uint64_t TotalSize = 0; + uint64_t RawSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; - bool TotalSizeKnown = false; + bool RawSizeKnown = false; bool IsRecordRequest = false; uint64_t ElapsedTimeUs = 0; }; @@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (Value.ValueId == Request->Key->ValueId) { - Request->Key->ChunkId = Value.ContentId; - Request->TotalSize = Value.RawSize; - Request->TotalSizeKnown = true; + Request->Key->ChunkId = Value.ContentId; + Request->RawSize = Value.RawSize; + Request->RawSizeKnown = true; break; } } @@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { @@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + IoHash RawHash; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } - Request->Exists = true; - Request->TotalSize = Compressed.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Exists = true; + Request->RawSizeKnown = true; } } } @@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } - Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); - Request->Exists = true; - Request->TotalSize = Result.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Key->ChunkId = RawHash; + Request->Exists = true; + Request->RawSize = RawSize; + Request->RawSizeKnown = true; } } } @@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize || - IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize); + if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash) { return; } @@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names { if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Params.Value, RawHash); } else { @@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names Request.Value = std::move(Compressed); } } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.TotalSize = Params.RawSize; - Request.TotalSizeKnown = true; - Request.Source = Params.Source; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.RawSize = Params.RawSize; + Request.RawSizeKnown = true; + Request.Source = Params.Source; m_CacheStats.UpstreamHitCount++; }; @@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa } else { - Writer.AddInteger("RawSize"sv, Request.TotalSize); + Writer.AddInteger("RawSize"sv, Request.RawSize); } ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", @@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, - NiceBytes(Request.TotalSize), + NiceBytes(Request.RawSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source ? Request.Source->Url : "LOCAL"sv, NiceLatencyNs(Request.ElapsedTimeUs * 1000)); |