diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-18 12:02:41 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-11-25 10:17:34 +0100 |
| commit | 5041f3521c2b3074032c06ed04a391ed90a06c7e (patch) | |
| tree | fbe6c18c79de5abff79394b69d65e38004d9e39e /zenserver/cache/structuredcache.cpp | |
| parent | 0.1.9 (diff) | |
| download | zen-5041f3521c2b3074032c06ed04a391ed90a06c7e.tar.xz zen-5041f3521c2b3074032c06ed04a391ed90a06c7e.zip | |
reduce parsing of compressed headers
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 204 |
1 files changed, 109 insertions, 95 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index dfb69c0fe..d1630a4af 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -530,8 +530,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash())); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); } else { @@ -588,12 +588,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -655,13 +655,15 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request AttachmentCount Count; CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { - if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) + IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { if (StoreLocal) { - auto InsertResult = m_CidStore.AddChunk(Compressed); + auto InsertResult = + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -672,7 +674,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request else { ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", - HashView.AsHash(), + Hash, Ref.BucketSegment, Ref.HashKey); Count.Invalid++; @@ -680,10 +682,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (QueryLocal) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash())); + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)), Hash)); Count.Valid++; } } @@ -762,12 +763,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); + ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } @@ -889,7 +890,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Hash); ValidAttachments.emplace_back(Hash); @@ -992,10 +993,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request, m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Status.Success) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value), RawHash, RawSize)) { - m_CidStore.AddChunk(Compressed); - Source = UpstreamResult.Source; + if (RawHash == Ref.ValueContentId) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } } else { @@ -1007,13 +1017,13 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request, if (!Value) { - ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - ToString(Request.AcceptContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_INFO("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(Request.AcceptContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -1061,21 +1071,23 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request, Body.SetContentType(Request.RequestContentType()); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body), RawHash, RawSize); if (!Compressed) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) + if (RawHash != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1247,8 +1259,9 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack { if (Attachment->IsCompressedBinary()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); ValidAttachments.emplace_back(ValueHash); @@ -1448,7 +1461,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); Value.Exists = true; } else @@ -1560,7 +1573,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt Value.Exists = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -1607,7 +1620,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash()))); + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); } } @@ -1637,13 +1650,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } else { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - Request.RecordObject ? ""sv : " (PARTIAL)"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + Request.RecordObject ? " (PARTIAL)"sv : ""sv, + Request.Source ? Request.Source->Url : "LOCAL"sv, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); m_CacheStats.MissCount++; } } @@ -1839,25 +1852,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - CompressedBuffer& Result = Request.Result; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { - Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize); } } - if (Result) + if (Request.Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, Key.Bucket, Key.Hash, - NiceBytes(Result.GetCompressed().GetSize()), + NiceBytes(Request.Result.GetCompressed().GetSize()), "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; @@ -1873,12 +1885,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_INFO("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; } } @@ -1916,7 +1928,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { if (HasData && !SkipData) { - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize); } if (HasData && StoreData) @@ -1961,15 +1973,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http const CompressedBuffer& Result = Request.Result; if (Result) { - IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash()); - ResponseObject.AddHash("RawHash"sv, Hash); + ResponseObject.AddHash("RawHash"sv, Request.RawHash); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Result, Hash)); + RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); } else { - ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); } } else if (Request.RawHash != IoHash::Zero) @@ -2026,12 +2037,12 @@ namespace cache::detail { RecordBody* Record = nullptr; CompressedBuffer Value; const UpstreamEndpointInfo* Source = nullptr; - uint64_t TotalSize = 0; + uint64_t RawSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; - bool TotalSizeKnown = false; + bool RawSizeKnown = false; bool IsRecordRequest = false; uint64_t ElapsedTimeUs = 0; }; @@ -2303,9 +2314,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (Value.ValueId == Request->Key->ValueId) { - Request->Key->ChunkId = Value.ContentId; - Request->TotalSize = Value.RawSize; - Request->TotalSizeKnown = true; + Request->Key->ChunkId = Value.ContentId; + Request->RawSize = Value.RawSize; + Request->RawSizeKnown = true; break; } } @@ -2316,7 +2327,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { @@ -2325,16 +2336,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } - Request->Exists = true; - Request->TotalSize = Compressed.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Exists = true; + Request->RawSize = Compressed.DecodeRawSize(); + Request->RawSizeKnown = true; } } } @@ -2365,17 +2376,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } - Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); - Request->Exists = true; - Request->TotalSize = Result.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Key->ChunkId = RawHash; + Request->Exists = true; + Request->RawSize = RawSize; + Request->RawSizeKnown = true; } } } @@ -2417,9 +2430,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize || - IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize); + if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash) { return; } @@ -2428,7 +2442,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names { if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash); } else { @@ -2440,11 +2454,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names Request.Value = std::move(Compressed); } } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.TotalSize = Params.RawSize; - Request.TotalSizeKnown = true; - Request.Source = Params.Source; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.RawSize = Params.RawSize; + Request.RawSizeKnown = true; + Request.Source = Params.Source; m_CacheStats.UpstreamHitCount++; }; @@ -2482,7 +2496,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept } else { - Writer.AddInteger("RawSize"sv, Request.TotalSize); + Writer.AddInteger("RawSize"sv, Request.RawSize); } ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", @@ -2490,7 +2504,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, - NiceBytes(Request.TotalSize), + NiceBytes(Request.RawSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source ? Request.Source->Url : "LOCAL"sv, NiceLatencyNs(Request.ElapsedTimeUs * 1000)); @@ -2507,12 +2521,12 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept } else { - ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + ZEN_INFO("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}", + Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); m_CacheStats.MissCount++; } } |