aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp121
1 files changed, 66 insertions, 55 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index baaf94dd0..f649efa01 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash()));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
}
else
{
@@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
{
Package.AddAttachment(CbAttachment(Compressed, Hash));
@@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request,
m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
UpstreamResult.Status.Success)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
{
- m_CidStore.AddChunk(Compressed);
- Source = UpstreamResult.Source;
+ if (RawHash == Ref.ValueContentId)
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ Source = UpstreamResult.Source;
+ }
+ else
+ {
+ ZEN_WARN("got missmatching upstream cache value");
+ }
}
else
{
@@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Body.SetContentType(Request.RequestContentType());
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
-
- if (!Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
{
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
+ if (RawHash != Ref.ValueContentId)
{
return Request.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
{
ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
Value.Exists = true;
}
else
@@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Value.Exists = true;
if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
{
if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
{
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash())));
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
}
}
@@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
PolicyText = RequestObject["Policy"sv].AsString();
Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
- CompressedBuffer& Result = Request.Result;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize);
}
}
- if (Result)
+ if (Request.Result)
{
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
*Namespace,
Key.Bucket,
Key.Hash,
- NiceBytes(Result.GetCompressed().GetSize()),
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
"LOCAL"sv,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
@@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
{
if (HasData && !SkipData)
{
- Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize);
}
if (HasData && StoreData)
@@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
const CompressedBuffer& Result = Request.Result;
if (Result)
{
- IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash());
- ResponseObject.AddHash("RawHash"sv, Hash);
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Result, Hash));
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
}
else
{
- ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
}
}
else if (Request.RawHash != IoHash::Zero)
@@ -2310,12 +2318,12 @@ namespace cache::detail {
RecordBody* Record = nullptr;
CompressedBuffer Value;
const UpstreamEndpointInfo* Source = nullptr;
- uint64_t TotalSize = 0;
+ uint64_t RawSize = 0;
uint64_t RequestedSize = 0;
uint64_t RequestedOffset = 0;
CachePolicy DownstreamPolicy;
bool Exists = false;
- bool TotalSizeKnown = false;
+ bool RawSizeKnown = false;
bool IsRecordRequest = false;
uint64_t ElapsedTimeUs = 0;
};
@@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (Value.ValueId == Request->Key->ValueId)
{
- Request->Key->ChunkId = Value.ContentId;
- Request->TotalSize = Value.RawSize;
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
break;
}
}
@@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
@@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
}
else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ IoHash RawHash;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize);
if (Compressed)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Compressed;
}
- Request->Exists = true;
- Request->TotalSize = Compressed.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Exists = true;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize);
if (Result)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Result;
}
- Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
- Request->Exists = true;
- Request->TotalSize = Result.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = RawHash;
+ Request->Exists = true;
+ Request->RawSize = RawSize;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
- IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize);
+ if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash)
{
return;
}
@@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
{
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Params.Value, RawHash);
}
else
{
@@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
Request.Value = std::move(Compressed);
}
}
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.TotalSize = Params.RawSize;
- Request.TotalSizeKnown = true;
- Request.Source = Params.Source;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
m_CacheStats.UpstreamHitCount++;
};
@@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
}
else
{
- Writer.AddInteger("RawSize"sv, Request.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
}
ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
@@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
- NiceBytes(Request.TotalSize),
+ NiceBytes(Request.RawSize),
Request.IsRecordRequest ? "Record"sv : "Value"sv,
Request.Source ? Request.Source->Url : "LOCAL"sv,
NiceLatencyNs(Request.ElapsedTimeUs * 1000));