diff options
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 121 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 47 |
2 files changed, 91 insertions, 77 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index baaf94dd0..f649efa01 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash())); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); } else { @@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) { Package.AddAttachment(CbAttachment(Compressed, Hash)); @@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request, m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Status.Success) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) { - m_CidStore.AddChunk(Compressed); - Source = UpstreamResult.Source; + if (RawHash == Ref.ValueContentId) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } } else { @@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request, Body.SetContentType(Request.RequestContentType()); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - - if (!Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) + if (RawHash != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); Value.Exists = true; } else @@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Value.Exists = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash()))); + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); } } @@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - CompressedBuffer& Result = Request.Result; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { - Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize); } } - if (Result) + if (Request.Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, Key.Bucket, Key.Hash, - NiceBytes(Result.GetCompressed().GetSize()), + NiceBytes(Request.Result.GetCompressed().GetSize()), "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; @@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) { if (HasData && !SkipData) { - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize); } if (HasData && StoreData) @@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) const CompressedBuffer& Result = Request.Result; if (Result) { - IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash()); - ResponseObject.AddHash("RawHash"sv, Hash); + ResponseObject.AddHash("RawHash"sv, Request.RawHash); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Result, Hash)); + RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); } else { - ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); } } else if (Request.RawHash != IoHash::Zero) @@ -2310,12 +2318,12 @@ namespace cache::detail { RecordBody* Record = nullptr; CompressedBuffer Value; const UpstreamEndpointInfo* Source = nullptr; - uint64_t TotalSize = 0; + uint64_t RawSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; - bool TotalSizeKnown = false; + bool RawSizeKnown = false; bool IsRecordRequest = false; uint64_t ElapsedTimeUs = 0; }; @@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (Value.ValueId == Request->Key->ValueId) { - Request->Key->ChunkId = Value.ContentId; - Request->TotalSize = Value.RawSize; - Request->TotalSizeKnown = true; + Request->Key->ChunkId = Value.ContentId; + Request->RawSize = Value.RawSize; + Request->RawSizeKnown = true; break; } } @@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { @@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + IoHash RawHash; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } - Request->Exists = true; - Request->TotalSize = Compressed.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Exists = true; + Request->RawSizeKnown = true; } } } @@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } - Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); - Request->Exists = true; - Request->TotalSize = Result.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Key->ChunkId = RawHash; + Request->Exists = true; + Request->RawSize = RawSize; + Request->RawSizeKnown = true; } } } @@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize || - IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize); + if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash) { return; } @@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names { if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Params.Value, RawHash); } else { @@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names Request.Value = std::move(Compressed); } } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.TotalSize = Params.RawSize; - Request.TotalSizeKnown = true; - Request.Source = Params.Source; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.RawSize = Params.RawSize; + Request.RawSizeKnown = true; + Request.Source = Params.Source; m_CacheStats.UpstreamHitCount++; }; @@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa } else { - Writer.AddInteger("RawSize"sv, Request.TotalSize); + Writer.AddInteger("RawSize"sv, Request.RawSize); } ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", @@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, - NiceBytes(Request.TotalSize), + NiceBytes(Request.RawSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source ? Request.Source->Url : "LOCAL"sv, NiceLatencyNs(Request.ElapsedTimeUs * 1000)); diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 1f48aaebe..75f845cbf 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -491,7 +491,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) std::vector<IoHash> BadHashes; - auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) { + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); @@ -499,7 +499,13 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) } if (ContentType == ZenContentType::kCompressedBinary) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (Hash != RawHash) { return false; } @@ -509,7 +515,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) for (auto& Kv : m_CacheMap) { - if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload)) + if (!ValidateEntry(Kv.first, Kv.second.Payload.GetContentType(), Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -1021,7 +1027,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; - auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) { + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); @@ -1029,7 +1035,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } if (ContentType == ZenContentType::kCompressedBinary) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (RawHash != Hash) { return false; } @@ -1077,7 +1089,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) BadKeys.push_back(HashKey); continue; } - if (!ValidateEntry(Loc.GetContentType(), Value.Value)) + if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value)) { BadKeys.push_back(HashKey); continue; @@ -1108,7 +1120,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) return; } ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType(); - if (!ValidateEntry(ContentType, Buffer)) + if (!ValidateEntry(Hash, ContentType, Buffer)) { BadKeys.push_back(Hash); return; @@ -1127,7 +1139,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) return; } ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType(); - if (!ValidateEntry(ContentType, Buffer)) + if (!ValidateEntry(Hash, ContentType, Buffer)) { BadKeys.push_back(Hash); return; @@ -1678,18 +1690,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { - std::filesystem::path ParentPath = FsPath.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) - { - Ec.clear(); - std::filesystem::create_directories(ParentPath, Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); - } - } + CreateDirectories(FsPath.parent_path()); + Ec.clear(); // Try again DataFile.MoveTemporaryIntoPlace(FsPath, Ec); @@ -3100,7 +3102,8 @@ TEST_CASE("z$.scrub") { IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); - Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash())); + Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), + IoHash::FromBLAKE3(CompressedAttachmentData.DecodeRawHash())); Result.Attachments[Index] = CompressedAttachmentData; } Result.Record = Record.Save().GetBuffer().AsIoBuffer(); @@ -3142,7 +3145,7 @@ TEST_CASE("z$.scrub") Zcs.Put("mybucket", Cid, {.Value = Record.Record}); for (const CompressedBuffer& Attachment : Record.Attachments) { - CidStore.AddChunk(Attachment); + CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), IoHash::FromBLAKE3(Attachment.DecodeRawHash())); } } }; |