diff options
| author | Dan Engelbrecht <[email protected]> | 2022-12-07 11:21:41 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-12-07 02:21:41 -0800 |
| commit | 100c8f966b1c5b2fb190748f0177600562d1c5fe (patch) | |
| tree | fc85e350dea47330149a1d42eb7a6c7ae0a06111 /zenserver | |
| parent | Cache request record/replay (#198) (diff) | |
| download | zen-100c8f966b1c5b2fb190748f0177600562d1c5fe.tar.xz zen-100c8f966b1c5b2fb190748f0177600562d1c5fe.zip | |
optimizations (#200)
* Use direct file read and direct buffer allocation for small IoBuffer materalization
* Reduce range of materialized data in CompositeBuffer reading
CompressedBuffer header reading often only need a small part and not the whole file
* reduce lock contention in IoBuffer::Materialize
* Reduce parsing of compressed headers
Validate header type at decompression
* faster CreateDirectories - start from leaf going up and recurse back
* optimized BufferHeader::IsValid
* Add ValidateCompressedHeader to use when we don't need the actual compressed data
Validate that we always get compressed data in CidStore::AddChunk
* changelog
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 121 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 47 | ||||
| -rw-r--r-- | zenserver/cidstore.cpp | 26 | ||||
| -rw-r--r-- | zenserver/compute/function.cpp | 6 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 50 | ||||
| -rw-r--r-- | zenserver/testing/launch.cpp | 6 | ||||
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 8 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 86 |
8 files changed, 201 insertions, 149 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index baaf94dd0..f649efa01 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash())); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); } else { @@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) { Package.AddAttachment(CbAttachment(Compressed, Hash)); @@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request, m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Status.Success) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) { - m_CidStore.AddChunk(Compressed); - Source = UpstreamResult.Source; + if (RawHash == Ref.ValueContentId) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } } else { @@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request, Body.SetContentType(Request.RequestContentType()); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - - if (!Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) + if (RawHash != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); if (InsertResult.New) { Count.New++; @@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); Value.Exists = true; } else @@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Value.Exists = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash()))); + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); } } @@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - CompressedBuffer& Result = Request.Result; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { - Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize); } } - if (Result) + if (Request.Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, Key.Bucket, Key.Hash, - NiceBytes(Result.GetCompressed().GetSize()), + NiceBytes(Request.Result.GetCompressed().GetSize()), "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; @@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) { if (HasData && !SkipData) { - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize); } if (HasData && StoreData) @@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) const CompressedBuffer& Result = Request.Result; if (Result) { - IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash()); - ResponseObject.AddHash("RawHash"sv, Hash); + ResponseObject.AddHash("RawHash"sv, Request.RawHash); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Result, Hash)); + RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); } else { - ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); } } else if (Request.RawHash != IoHash::Zero) @@ -2310,12 +2318,12 @@ namespace cache::detail { RecordBody* Record = nullptr; CompressedBuffer Value; const UpstreamEndpointInfo* Source = nullptr; - uint64_t TotalSize = 0; + uint64_t RawSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; - bool TotalSizeKnown = false; + bool RawSizeKnown = false; bool IsRecordRequest = false; uint64_t ElapsedTimeUs = 0; }; @@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (Value.ValueId == Request->Key->ValueId) { - Request->Key->ChunkId = Value.ContentId; - Request->TotalSize = Value.RawSize; - Request->TotalSizeKnown = true; + Request->Key->ChunkId = Value.ContentId; + Request->RawSize = Value.RawSize; + Request->RawSizeKnown = true; break; } } @@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { @@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + IoHash RawHash; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } - Request->Exists = true; - Request->TotalSize = Compressed.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Exists = true; + Request->RawSizeKnown = true; } } } @@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } - Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); - Request->Exists = true; - Request->TotalSize = Result.GetRawSize(); - Request->TotalSizeKnown = true; + Request->Key->ChunkId = RawHash; + Request->Exists = true; + Request->RawSize = RawSize; + Request->RawSizeKnown = true; } } } @@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize || - IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize); + if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash) { return; } @@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names { if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Params.Value, RawHash); } else { @@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names Request.Value = std::move(Compressed); } } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.TotalSize = Params.RawSize; - Request.TotalSizeKnown = true; - Request.Source = Params.Source; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.RawSize = Params.RawSize; + Request.RawSizeKnown = true; + Request.Source = Params.Source; m_CacheStats.UpstreamHitCount++; }; @@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa } else { - Writer.AddInteger("RawSize"sv, Request.TotalSize); + Writer.AddInteger("RawSize"sv, Request.RawSize); } ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", @@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, - NiceBytes(Request.TotalSize), + NiceBytes(Request.RawSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source ? Request.Source->Url : "LOCAL"sv, NiceLatencyNs(Request.ElapsedTimeUs * 1000)); diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 1f48aaebe..75f845cbf 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -491,7 +491,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) std::vector<IoHash> BadHashes; - auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) { + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); @@ -499,7 +499,13 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) } if (ContentType == ZenContentType::kCompressedBinary) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (Hash != RawHash) { return false; } @@ -509,7 +515,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) for (auto& Kv : m_CacheMap) { - if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload)) + if (!ValidateEntry(Kv.first, Kv.second.Payload.GetContentType(), Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -1021,7 +1027,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; - auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) { + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); @@ -1029,7 +1035,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } if (ContentType == ZenContentType::kCompressedBinary) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (RawHash != Hash) { return false; } @@ -1077,7 +1089,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) BadKeys.push_back(HashKey); continue; } - if (!ValidateEntry(Loc.GetContentType(), Value.Value)) + if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value)) { BadKeys.push_back(HashKey); continue; @@ -1108,7 +1120,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) return; } ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType(); - if (!ValidateEntry(ContentType, Buffer)) + if (!ValidateEntry(Hash, ContentType, Buffer)) { BadKeys.push_back(Hash); return; @@ -1127,7 +1139,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) return; } ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType(); - if (!ValidateEntry(ContentType, Buffer)) + if (!ValidateEntry(Hash, ContentType, Buffer)) { BadKeys.push_back(Hash); return; @@ -1678,18 +1690,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { - std::filesystem::path ParentPath = FsPath.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) - { - Ec.clear(); - std::filesystem::create_directories(ParentPath, Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); - } - } + CreateDirectories(FsPath.parent_path()); + Ec.clear(); // Try again DataFile.MoveTemporaryIntoPlace(FsPath, Ec); @@ -3100,7 +3102,8 @@ TEST_CASE("z$.scrub") { IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); - Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash())); + Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), + IoHash::FromBLAKE3(CompressedAttachmentData.DecodeRawHash())); Result.Attachments[Index] = CompressedAttachmentData; } Result.Record = Record.Save().GetBuffer().AsIoBuffer(); @@ -3142,7 +3145,7 @@ TEST_CASE("z$.scrub") Zcs.Put("mybucket", Cid, {.Value = Record.Record}); for (const CompressedBuffer& Attachment : Record.Attachments) { - CidStore.AddChunk(Attachment); + CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), IoHash::FromBLAKE3(Attachment.DecodeRawHash())); } } }; diff --git a/zenserver/cidstore.cpp b/zenserver/cidstore.cpp index 5de347a17..bce4f1dfb 100644 --- a/zenserver/cidstore.cpp +++ b/zenserver/cidstore.cpp @@ -39,22 +39,21 @@ HttpCidService::HttpCidService(CidStore& Store) : m_CidStore(Store) case HttpVerb::kPut: { - IoBuffer Payload = ServerRequest.ReadPayload(); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); - if (!Compressed) + IoBuffer Payload = ServerRequest.ReadPayload(); + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) { return ServerRequest.WriteResponse(HttpResponseCode::UnsupportedMediaType); } - IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - // URI hash must match content hash - if (PayloadHash != Hash) + if (RawHash != Hash) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest); } - m_CidStore.AddChunk(Compressed); + m_CidStore.AddChunk(Payload, RawHash); return ServerRequest.WriteResponse(HttpResponseCode::OK); } @@ -85,18 +84,17 @@ HttpCidService::HandleRequest(zen::HttpServerRequest& Request) case HttpVerb::kPut: case HttpVerb::kPost: { - IoBuffer Payload = Request.ReadPayload(); - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); - if (!Compressed) + IoBuffer Payload = Request.ReadPayload(); + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) { return Request.WriteResponse(HttpResponseCode::UnsupportedMediaType); } - IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - - ZEN_DEBUG("CID POST request for {} ({} bytes)", PayloadHash, Payload.Size()); + ZEN_DEBUG("CID POST request for {} ({} bytes)", RawHash, Payload.Size()); - auto InsertResult = m_CidStore.AddChunk(Compressed); + auto InsertResult = m_CidStore.AddChunk(Payload, RawHash); if (InsertResult.New) { diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index d7316ac64..493e2666e 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -162,7 +162,8 @@ HttpFunctionService::HttpFunctionService(CidStore& InCidStore, TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer); + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { @@ -404,7 +405,8 @@ HttpFunctionService::HttpFunctionService(CidStore& InCidStore, TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 3a65feb0f..2c44beaee 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -643,8 +643,8 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) ZEN_ASSERT(Attach.IsCompressedBinary()); CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); - const uint64_t AttachmentSize = AttachmentData.GetRawSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); if (InsertResult.New) { @@ -1410,9 +1410,11 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId, uint64_t ChunkSize = Chunk.GetSize(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - ZEN_ASSERT(!Compressed.IsNull()); - ChunkSize = Compressed.GetRawSize(); + IoHash RawHash; + uint64_t RawSize; + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + ZEN_ASSERT(IsCompressed); + ChunkSize = RawSize; } CbObjectWriter Response; @@ -1467,12 +1469,13 @@ ProjectStore::GetChunk(const std::string_view ProjectId, if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) { - uint64_t RawSize = Compressed.GetRawSize(); if ((Offset + Size) > RawSize) { Size = RawSize - Offset; @@ -1542,7 +1545,7 @@ ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, Io if (AcceptType == HttpContentType::kBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); OutChunk = Compressed.Decompress().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } @@ -1824,7 +1827,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) if (AcceptType == HttpContentType::kBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); Value = Compressed.Decompress().AsIoBuffer(); ContentType = HttpContentType::kBinary; } @@ -2069,7 +2072,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) break; case ZenContentType::kCompressedBinary: - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) { Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); } @@ -2202,7 +2205,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); IoBuffer CompressedPayload = HttpReq.ReadPayload(); - IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); + IoBuffer Payload = + CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); CbPackage RequestPackage = ParsePackageMessage(Payload); CbObject Request = RequestPackage.GetObject(); @@ -2278,7 +2282,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) try { CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); + m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(), + Attachment.GetHash(), + CidStore::InsertMode::kCopyOnly); } catch (std::exception& e) { @@ -2572,7 +2578,7 @@ namespace testutils { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { - CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash())); + CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.DecodeRawHash())); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv @@ -2828,11 +2834,13 @@ TEST_CASE("project.store.partial.read") } { IoBuffer Chunk; - CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(), + CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.DecodeRawHash()).ToHexString(), HttpContentType::kCompressedBinary, Chunk) == HttpResponseCode::OK); - CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize()); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); } IoBuffer ChunkResult; @@ -2844,7 +2852,8 @@ TEST_CASE("project.store.partial.read") HttpContentType::kCompressedBinary, ChunkResult) == HttpResponseCode::OK); CHECK(ChunkResult); - CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize()); + CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == + Attachments[OpIds[2]][1].second.DecodeRawSize()); IoBuffer PartialChunkResult; CHECK(ProjectStore.GetChunk("proj1"sv, @@ -2855,8 +2864,11 @@ TEST_CASE("project.store.partial.read") HttpContentType::kCompressedBinary, PartialChunkResult) == HttpResponseCode::OK); CHECK(PartialChunkResult); - CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult)); - CHECK(PartialCompressedResult.GetRawSize() >= 1773); + IoHash PartialRawHash; + uint64_t PartialRawSize; + CompressedBuffer PartialCompressedResult = + CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize); + CHECK(PartialRawSize >= 1773); uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp index 0e46fff94..b26f9e437 100644 --- a/zenserver/testing/launch.cpp +++ b/zenserver/testing/launch.cpp @@ -477,7 +477,11 @@ HttpLaunchService::HttpLaunchService(CidStore& Store, const std::filesystem::pat { std::filesystem::path FullPath = SandboxDir / FileName; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + ZEN_ASSERT(Compressed); + ZEN_ASSERT(FileHash == RawHash); CompositeBuffer CompositeBuffer = Compressed.DecompressToComposite(); std::span<const SharedBuffer> Segments = CompositeBuffer.GetSegments(); std::vector<IoBuffer> Chunks(Segments.size()); diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index 988386726..64d9fff72 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -983,10 +983,12 @@ namespace detail { return; } + IoHash RawHash; + uint64_t RawSize; CompressedBuffer AttachmentBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); + CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]), RawHash, RawSize); - if (!AttachmentBuffer) + if (!AttachmentBuffer || RawHash != DecompressedId) { Log().warn( "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", @@ -997,7 +999,7 @@ namespace detail { } ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + ApplyResult.TotalRawAttachmentBytes += RawSize; CbAttachment Attachment(AttachmentBuffer, DecompressedId); OutputPackage.AddAttachment(Attachment); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index bc06653b9..6e5422007 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -214,7 +214,9 @@ namespace detail { Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize)) { Result.Response = AttachmentResult.Response; ++NumAttachments; @@ -251,7 +253,10 @@ namespace detail { Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Chunk = + CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response), RawHash, RawSize)) { Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); } @@ -335,9 +340,15 @@ namespace detail { if (BlobResult.ErrorCode == 0) { - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Chunk = + CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response), RawHash, RawSize)) { - Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); + if (RawHash == AttachmentHash.AsHash()) + { + Package.AddAttachment(CbAttachment(Chunk, RawHash)); + } } } }); @@ -398,9 +409,11 @@ namespace detail { { CacheChunkRequest& Request = *RequestPtr; IoBuffer Payload; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; - double ElapsedSeconds = 0.0; - CompressedBuffer Compressed; + double ElapsedSeconds = 0.0; + bool IsCompressed = false; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); @@ -416,15 +429,15 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (Payload && IsCompressedBinary(Payload.GetContentType())) { - Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize); } } - if (Compressed) + if (IsCompressed) { OnComplete({.Request = Request, - .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), - .RawSize = Compressed.GetRawSize(), + .RawHash = RawHash, + .RawSize = RawSize, .Value = Payload, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); @@ -451,9 +464,11 @@ namespace detail { { CacheValueRequest& Request = *RequestPtr; IoBuffer Payload; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; - double ElapsedSeconds = 0.0; - CompressedBuffer Compressed; + double ElapsedSeconds = 0.0; + bool IsCompressed = false; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); @@ -470,13 +485,17 @@ namespace detail { { if (IsCompressedBinary(Payload.GetContentType())) { - Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash; } else { - Compressed = CompressedBuffer::Compress(SharedBuffer(Payload)); - IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - if (RawHash != PayloadHash) + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(Payload)); + RawHash = IoHash::FromBLAKE3(Compressed.DecodeRawHash()); + if (RawHash == PayloadHash) + { + IsCompressed = true; + } + else { ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", Namespace, @@ -484,17 +503,16 @@ namespace detail { Request.Key.Hash.ToHexString(), RawHash.ToHexString(), PayloadHash.ToHexString()); - Compressed.Reset(); } } } } - if (Compressed) + if (IsCompressed) { OnComplete({.Request = Request, - .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), - .RawSize = Compressed.GetRawSize(), + .RawHash = RawHash, + .RawSize = RawSize, .Value = Payload, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); @@ -543,17 +561,16 @@ namespace detail { } else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); - if (!Compressed) + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize)) { return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; } - IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - CbObjectWriter ReferencingObject; ReferencingObject.AddBinaryAttachment("RawHash", RawHash); - ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize()); + ReferencingObject.AddInteger("RawSize", RawSize); return PerformStructuredPut( Session, @@ -1053,7 +1070,7 @@ namespace detail { { Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); - RawSize = Compressed.GetRawSize(); + RawSize = Compressed.DecodeRawSize(); Success = true; } } @@ -1189,7 +1206,7 @@ namespace detail { { Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); - RawSize = Compressed.GetRawSize(); + RawSize = Compressed.DecodeRawSize(); Success = true; } } @@ -1252,9 +1269,11 @@ namespace detail { for (const IoBuffer& Value : Values) { - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value), RawHash, RawSize)) { - Package.AddAttachment(CbAttachment(AttachmentBuffer, IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()))); + Package.AddAttachment(CbAttachment(AttachmentBuffer, RawHash)); } else { @@ -1282,7 +1301,9 @@ namespace detail { } else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue), RawHash, RawSize); if (!Compressed) { return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; @@ -1312,9 +1333,8 @@ namespace detail { } BatchWriter.EndObject(); // Policy unspecified and expected to be Default - IoHash Hash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - BatchWriter.AddBinaryAttachment("RawHash"sv, Hash); - BatchPackage.AddAttachment(CbAttachment(Compressed, Hash)); + BatchWriter.AddBinaryAttachment("RawHash"sv, RawHash); + BatchPackage.AddAttachment(CbAttachment(Compressed, RawHash)); } BatchWriter.EndObject(); } |