aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-07 11:21:41 +0100
committerGitHub <[email protected]>2022-12-07 02:21:41 -0800
commit100c8f966b1c5b2fb190748f0177600562d1c5fe (patch)
treefc85e350dea47330149a1d42eb7a6c7ae0a06111 /zenserver
parentCache request record/replay (#198) (diff)
downloadzen-100c8f966b1c5b2fb190748f0177600562d1c5fe.tar.xz
zen-100c8f966b1c5b2fb190748f0177600562d1c5fe.zip
optimizations (#200)
* Use direct file read and direct buffer allocation for small IoBuffer materalization * Reduce range of materialized data in CompositeBuffer reading CompressedBuffer header reading often only need a small part and not the whole file * reduce lock contention in IoBuffer::Materialize * Reduce parsing of compressed headers Validate header type at decompression * faster CreateDirectories - start from leaf going up and recurse back * optimized BufferHeader::IsValid * Add ValidateCompressedHeader to use when we don't need the actual compressed data Validate that we always get compressed data in CidStore::AddChunk * changelog
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp121
-rw-r--r--zenserver/cache/structuredcachestore.cpp47
-rw-r--r--zenserver/cidstore.cpp26
-rw-r--r--zenserver/compute/function.cpp6
-rw-r--r--zenserver/projectstore.cpp50
-rw-r--r--zenserver/testing/launch.cpp6
-rw-r--r--zenserver/upstream/hordecompute.cpp8
-rw-r--r--zenserver/upstream/upstreamcache.cpp86
8 files changed, 201 insertions, 149 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index baaf94dd0..f649efa01 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash()));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
}
else
{
@@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
{
Package.AddAttachment(CbAttachment(Compressed, Hash));
@@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request,
m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
UpstreamResult.Status.Success)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
{
- m_CidStore.AddChunk(Compressed);
- Source = UpstreamResult.Source;
+ if (RawHash == Ref.ValueContentId)
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ Source = UpstreamResult.Source;
+ }
+ else
+ {
+ ZEN_WARN("got missmatching upstream cache value");
+ }
}
else
{
@@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Body.SetContentType(Request.RequestContentType());
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
-
- if (!Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
{
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
+ if (RawHash != Ref.ValueContentId)
{
return Request.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
{
ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
Value.Exists = true;
}
else
@@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Value.Exists = true;
if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
{
if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
{
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash())));
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
}
}
@@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
PolicyText = RequestObject["Policy"sv].AsString();
Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
- CompressedBuffer& Result = Request.Result;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize);
}
}
- if (Result)
+ if (Request.Result)
{
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
*Namespace,
Key.Bucket,
Key.Hash,
- NiceBytes(Result.GetCompressed().GetSize()),
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
"LOCAL"sv,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
@@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
{
if (HasData && !SkipData)
{
- Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize);
}
if (HasData && StoreData)
@@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
const CompressedBuffer& Result = Request.Result;
if (Result)
{
- IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash());
- ResponseObject.AddHash("RawHash"sv, Hash);
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Result, Hash));
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
}
else
{
- ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
}
}
else if (Request.RawHash != IoHash::Zero)
@@ -2310,12 +2318,12 @@ namespace cache::detail {
RecordBody* Record = nullptr;
CompressedBuffer Value;
const UpstreamEndpointInfo* Source = nullptr;
- uint64_t TotalSize = 0;
+ uint64_t RawSize = 0;
uint64_t RequestedSize = 0;
uint64_t RequestedOffset = 0;
CachePolicy DownstreamPolicy;
bool Exists = false;
- bool TotalSizeKnown = false;
+ bool RawSizeKnown = false;
bool IsRecordRequest = false;
uint64_t ElapsedTimeUs = 0;
};
@@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (Value.ValueId == Request->Key->ValueId)
{
- Request->Key->ChunkId = Value.ContentId;
- Request->TotalSize = Value.RawSize;
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
break;
}
}
@@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
@@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
}
else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ IoHash RawHash;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize);
if (Compressed)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Compressed;
}
- Request->Exists = true;
- Request->TotalSize = Compressed.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Exists = true;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize);
if (Result)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Result;
}
- Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
- Request->Exists = true;
- Request->TotalSize = Result.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = RawHash;
+ Request->Exists = true;
+ Request->RawSize = RawSize;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
- IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize);
+ if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash)
{
return;
}
@@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
{
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Params.Value, RawHash);
}
else
{
@@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
Request.Value = std::move(Compressed);
}
}
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.TotalSize = Params.RawSize;
- Request.TotalSizeKnown = true;
- Request.Source = Params.Source;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
m_CacheStats.UpstreamHitCount++;
};
@@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
}
else
{
- Writer.AddInteger("RawSize"sv, Request.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
}
ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
@@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
- NiceBytes(Request.TotalSize),
+ NiceBytes(Request.RawSize),
Request.IsRecordRequest ? "Record"sv : "Value"sv,
Request.Source ? Request.Source->Url : "LOCAL"sv,
NiceLatencyNs(Request.ElapsedTimeUs * 1000));
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 1f48aaebe..75f845cbf 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -491,7 +491,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
- auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
if (ContentType == ZenContentType::kCbObject)
{
CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
@@ -499,7 +499,13 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
if (ContentType == ZenContentType::kCompressedBinary)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (Hash != RawHash)
{
return false;
}
@@ -509,7 +515,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
for (auto& Kv : m_CacheMap)
{
- if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload))
+ if (!ValidateEntry(Kv.first, Kv.second.Payload.GetContentType(), Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -1021,7 +1027,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
- auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
if (ContentType == ZenContentType::kCbObject)
{
CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
@@ -1029,7 +1035,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
if (ContentType == ZenContentType::kCompressedBinary)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (RawHash != Hash)
{
return false;
}
@@ -1077,7 +1089,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
BadKeys.push_back(HashKey);
continue;
}
- if (!ValidateEntry(Loc.GetContentType(), Value.Value))
+ if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value))
{
BadKeys.push_back(HashKey);
continue;
@@ -1108,7 +1120,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
return;
}
ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType();
- if (!ValidateEntry(ContentType, Buffer))
+ if (!ValidateEntry(Hash, ContentType, Buffer))
{
BadKeys.push_back(Hash);
return;
@@ -1127,7 +1139,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
return;
}
ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType();
- if (!ValidateEntry(ContentType, Buffer))
+ if (!ValidateEntry(Hash, ContentType, Buffer))
{
BadKeys.push_back(Hash);
return;
@@ -1678,18 +1690,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
if (Ec)
{
- std::filesystem::path ParentPath = FsPath.parent_path();
- if (!std::filesystem::is_directory(ParentPath))
- {
- Ec.clear();
- std::filesystem::create_directories(ParentPath, Ec);
- if (Ec)
- {
- throw std::system_error(
- Ec,
- fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
- }
- }
+ CreateDirectories(FsPath.parent_path());
+ Ec.clear();
// Try again
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
@@ -3100,7 +3102,8 @@ TEST_CASE("z$.scrub")
{
IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash()));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index),
+ IoHash::FromBLAKE3(CompressedAttachmentData.DecodeRawHash()));
Result.Attachments[Index] = CompressedAttachmentData;
}
Result.Record = Record.Save().GetBuffer().AsIoBuffer();
@@ -3142,7 +3145,7 @@ TEST_CASE("z$.scrub")
Zcs.Put("mybucket", Cid, {.Value = Record.Record});
for (const CompressedBuffer& Attachment : Record.Attachments)
{
- CidStore.AddChunk(Attachment);
+ CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), IoHash::FromBLAKE3(Attachment.DecodeRawHash()));
}
}
};
diff --git a/zenserver/cidstore.cpp b/zenserver/cidstore.cpp
index 5de347a17..bce4f1dfb 100644
--- a/zenserver/cidstore.cpp
+++ b/zenserver/cidstore.cpp
@@ -39,22 +39,21 @@ HttpCidService::HttpCidService(CidStore& Store) : m_CidStore(Store)
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
- if (!Compressed)
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
{
return ServerRequest.WriteResponse(HttpResponseCode::UnsupportedMediaType);
}
- IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
-
// URI hash must match content hash
- if (PayloadHash != Hash)
+ if (RawHash != Hash)
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Payload, RawHash);
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -85,18 +84,17 @@ HttpCidService::HandleRequest(zen::HttpServerRequest& Request)
case HttpVerb::kPut:
case HttpVerb::kPost:
{
- IoBuffer Payload = Request.ReadPayload();
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
- if (!Compressed)
+ IoBuffer Payload = Request.ReadPayload();
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
{
return Request.WriteResponse(HttpResponseCode::UnsupportedMediaType);
}
- IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
-
- ZEN_DEBUG("CID POST request for {} ({} bytes)", PayloadHash, Payload.Size());
+ ZEN_DEBUG("CID POST request for {} ({} bytes)", RawHash, Payload.Size());
- auto InsertResult = m_CidStore.AddChunk(Compressed);
+ auto InsertResult = m_CidStore.AddChunk(Payload, RawHash);
if (InsertResult.New)
{
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index d7316ac64..493e2666e 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -162,7 +162,8 @@ HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer);
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
@@ -404,7 +405,8 @@ HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
TotalAttachmentBytes += CompressedSize;
++AttachmentCount;
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 3a65feb0f..2c44beaee 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -643,8 +643,8 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
ZEN_ASSERT(Attach.IsCompressedBinary());
CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
- const uint64_t AttachmentSize = AttachmentData.GetRawSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData);
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
if (InsertResult.New)
{
@@ -1410,9 +1410,11 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId,
uint64_t ChunkSize = Chunk.GetSize();
if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- ZEN_ASSERT(!Compressed.IsNull());
- ChunkSize = Compressed.GetRawSize();
+ IoHash RawHash;
+ uint64_t RawSize;
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ ZEN_ASSERT(IsCompressed);
+ ChunkSize = RawSize;
}
CbObjectWriter Response;
@@ -1467,12 +1469,13 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
ZEN_ASSERT(!Compressed.IsNull());
if (IsOffset)
{
- uint64_t RawSize = Compressed.GetRawSize();
if ((Offset + Size) > RawSize)
{
Size = RawSize - Offset;
@@ -1542,7 +1545,7 @@ ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, Io
if (AcceptType == HttpContentType::kBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
OutChunk = Compressed.Decompress().AsIoBuffer();
OutChunk.SetContentType(HttpContentType::kBinary);
}
@@ -1824,7 +1827,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
if (AcceptType == HttpContentType::kBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value));
Value = Compressed.Decompress().AsIoBuffer();
ContentType = HttpContentType::kBinary;
}
@@ -2069,7 +2072,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
break;
case ZenContentType::kCompressedBinary:
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)))
{
Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
}
@@ -2202,7 +2205,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId);
IoBuffer CompressedPayload = HttpReq.ReadPayload();
- IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer();
+ IoBuffer Payload =
+ CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer();
CbPackage RequestPackage = ParsePackageMessage(Payload);
CbObject Request = RequestPackage.GetObject();
@@ -2278,7 +2282,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
try
{
CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
+ m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(),
+ Attachment.GetHash(),
+ CidStore::InsertMode::kCopyOnly);
}
catch (std::exception& e)
{
@@ -2572,7 +2578,7 @@ namespace testutils {
Object.BeginArray("bulkdata");
for (const auto& Attachment : Attachments)
{
- CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash()));
+ CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.DecodeRawHash()));
Object.BeginObject();
Object << "id"sv << Attachment.first;
Object << "type"sv
@@ -2828,11 +2834,13 @@ TEST_CASE("project.store.partial.read")
}
{
IoBuffer Chunk;
- CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(),
+ CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.DecodeRawHash()).ToHexString(),
HttpContentType::kCompressedBinary,
Chunk) == HttpResponseCode::OK);
- CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize());
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
}
IoBuffer ChunkResult;
@@ -2844,7 +2852,8 @@ TEST_CASE("project.store.partial.read")
HttpContentType::kCompressedBinary,
ChunkResult) == HttpResponseCode::OK);
CHECK(ChunkResult);
- CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize());
+ CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() ==
+ Attachments[OpIds[2]][1].second.DecodeRawSize());
IoBuffer PartialChunkResult;
CHECK(ProjectStore.GetChunk("proj1"sv,
@@ -2855,8 +2864,11 @@ TEST_CASE("project.store.partial.read")
HttpContentType::kCompressedBinary,
PartialChunkResult) == HttpResponseCode::OK);
CHECK(PartialChunkResult);
- CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult));
- CHECK(PartialCompressedResult.GetRawSize() >= 1773);
+ IoHash PartialRawHash;
+ uint64_t PartialRawSize;
+ CompressedBuffer PartialCompressedResult =
+ CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize);
+ CHECK(PartialRawSize >= 1773);
uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp
index 0e46fff94..b26f9e437 100644
--- a/zenserver/testing/launch.cpp
+++ b/zenserver/testing/launch.cpp
@@ -477,7 +477,11 @@ HttpLaunchService::HttpLaunchService(CidStore& Store, const std::filesystem::pat
{
std::filesystem::path FullPath = SandboxDir / FileName;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(FileHash == RawHash);
CompositeBuffer CompositeBuffer = Compressed.DecompressToComposite();
std::span<const SharedBuffer> Segments = CompositeBuffer.GetSegments();
std::vector<IoBuffer> Chunks(Segments.size());
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 988386726..64d9fff72 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -983,10 +983,12 @@ namespace detail {
return;
}
+ IoHash RawHash;
+ uint64_t RawSize;
CompressedBuffer AttachmentBuffer =
- CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
+ CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]), RawHash, RawSize);
- if (!AttachmentBuffer)
+ if (!AttachmentBuffer || RawHash != DecompressedId)
{
Log().warn(
"Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
@@ -997,7 +999,7 @@ namespace detail {
}
ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+ ApplyResult.TotalRawAttachmentBytes += RawSize;
CbAttachment Attachment(AttachmentBuffer, DecompressedId);
OutputPackage.AddAttachment(Attachment);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index bc06653b9..6e5422007 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -214,7 +214,9 @@ namespace detail {
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize))
{
Result.Response = AttachmentResult.Response;
++NumAttachments;
@@ -251,7 +253,10 @@ namespace detail {
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Chunk =
+ CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response), RawHash, RawSize))
{
Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash()));
}
@@ -335,9 +340,15 @@ namespace detail {
if (BlobResult.ErrorCode == 0)
{
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Chunk =
+ CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response), RawHash, RawSize))
{
- Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash()));
+ if (RawHash == AttachmentHash.AsHash())
+ {
+ Package.AddAttachment(CbAttachment(Chunk, RawHash));
+ }
}
}
});
@@ -398,9 +409,11 @@ namespace detail {
{
CacheChunkRequest& Request = *RequestPtr;
IoBuffer Payload;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
- double ElapsedSeconds = 0.0;
- CompressedBuffer Compressed;
+ double ElapsedSeconds = 0.0;
+ bool IsCompressed = false;
if (!Result.Error)
{
std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
@@ -416,15 +429,15 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (Payload && IsCompressedBinary(Payload.GetContentType()))
{
- Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize);
}
}
- if (Compressed)
+ if (IsCompressed)
{
OnComplete({.Request = Request,
- .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
- .RawSize = Compressed.GetRawSize(),
+ .RawHash = RawHash,
+ .RawSize = RawSize,
.Value = Payload,
.ElapsedSeconds = ElapsedSeconds,
.Source = &m_Info});
@@ -451,9 +464,11 @@ namespace detail {
{
CacheValueRequest& Request = *RequestPtr;
IoBuffer Payload;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
- double ElapsedSeconds = 0.0;
- CompressedBuffer Compressed;
+ double ElapsedSeconds = 0.0;
+ bool IsCompressed = false;
if (!Result.Error)
{
std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
@@ -470,13 +485,17 @@ namespace detail {
{
if (IsCompressedBinary(Payload.GetContentType()))
{
- Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash;
}
else
{
- Compressed = CompressedBuffer::Compress(SharedBuffer(Payload));
- IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- if (RawHash != PayloadHash)
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(Payload));
+ RawHash = IoHash::FromBLAKE3(Compressed.DecodeRawHash());
+ if (RawHash == PayloadHash)
+ {
+ IsCompressed = true;
+ }
+ else
{
ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header",
Namespace,
@@ -484,17 +503,16 @@ namespace detail {
Request.Key.Hash.ToHexString(),
RawHash.ToHexString(),
PayloadHash.ToHexString());
- Compressed.Reset();
}
}
}
}
- if (Compressed)
+ if (IsCompressed)
{
OnComplete({.Request = Request,
- .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
- .RawSize = Compressed.GetRawSize(),
+ .RawHash = RawHash,
+ .RawSize = RawSize,
.Value = Payload,
.ElapsedSeconds = ElapsedSeconds,
.Source = &m_Info});
@@ -543,17 +561,16 @@ namespace detail {
}
else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
- if (!Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize))
{
return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
}
- IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
-
CbObjectWriter ReferencingObject;
ReferencingObject.AddBinaryAttachment("RawHash", RawHash);
- ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize());
+ ReferencingObject.AddInteger("RawSize", RawSize);
return PerformStructuredPut(
Session,
@@ -1053,7 +1070,7 @@ namespace detail {
{
Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
Payload.SetContentType(ZenContentType::kCompressedBinary);
- RawSize = Compressed.GetRawSize();
+ RawSize = Compressed.DecodeRawSize();
Success = true;
}
}
@@ -1189,7 +1206,7 @@ namespace detail {
{
Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
Payload.SetContentType(ZenContentType::kCompressedBinary);
- RawSize = Compressed.GetRawSize();
+ RawSize = Compressed.DecodeRawSize();
Success = true;
}
}
@@ -1252,9 +1269,11 @@ namespace detail {
for (const IoBuffer& Value : Values)
{
- if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value), RawHash, RawSize))
{
- Package.AddAttachment(CbAttachment(AttachmentBuffer, IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())));
+ Package.AddAttachment(CbAttachment(AttachmentBuffer, RawHash));
}
else
{
@@ -1282,7 +1301,9 @@ namespace detail {
}
else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue), RawHash, RawSize);
if (!Compressed)
{
return {.Reason = std::string("Invalid value compressed buffer"), .Success = false};
@@ -1312,9 +1333,8 @@ namespace detail {
}
BatchWriter.EndObject();
// Policy unspecified and expected to be Default
- IoHash Hash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- BatchWriter.AddBinaryAttachment("RawHash"sv, Hash);
- BatchPackage.AddAttachment(CbAttachment(Compressed, Hash));
+ BatchWriter.AddBinaryAttachment("RawHash"sv, RawHash);
+ BatchPackage.AddAttachment(CbAttachment(Compressed, RawHash));
}
BatchWriter.EndObject();
}