aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-11-18 12:02:41 +0100
committerDan Engelbrecht <[email protected]>2022-11-25 10:17:34 +0100
commit5041f3521c2b3074032c06ed04a391ed90a06c7e (patch)
treefbe6c18c79de5abff79394b69d65e38004d9e39e /zenserver/cache/structuredcache.cpp
parent0.1.9 (diff)
downloadzen-5041f3521c2b3074032c06ed04a391ed90a06c7e.tar.xz
zen-5041f3521c2b3074032c06ed04a391ed90a06c7e.zip
reduce parsing of compressed headers
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp204
1 files changed, 109 insertions, 95 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index dfb69c0fe..d1630a4af 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -530,8 +530,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash()));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
}
else
{
@@ -588,12 +588,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
{
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -655,13 +655,15 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
AttachmentCount Count;
CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash()))
+ IoHash Hash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
if (StoreLocal)
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
+ auto InsertResult =
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -672,7 +674,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
else
{
ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
+ Hash,
Ref.BucketSegment,
Ref.HashKey);
Count.Invalid++;
@@ -680,10 +682,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else if (QueryLocal)
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash()));
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)), Hash));
Count.Valid++;
}
}
@@ -762,12 +763,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else
{
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
+ ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
m_CacheStats.MissCount++;
AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
}
@@ -889,7 +890,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (Attachment->IsCompressedBinary())
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Hash);
ValidAttachments.emplace_back(Hash);
@@ -992,10 +993,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request,
m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
UpstreamResult.Status.Success)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value), RawHash, RawSize))
{
- m_CidStore.AddChunk(Compressed);
- Source = UpstreamResult.Source;
+ if (RawHash == Ref.ValueContentId)
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ Source = UpstreamResult.Source;
+ }
+ else
+ {
+ ZEN_WARN("got missmatching upstream cache value");
+ }
}
else
{
@@ -1007,13 +1017,13 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request,
if (!Value)
{
- ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- ToString(Request.AcceptContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_INFO("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ ToString(Request.AcceptContentType()),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -1061,21 +1071,23 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Body.SetContentType(Request.RequestContentType());
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body), RawHash, RawSize);
if (!Compressed)
{
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
+ if (RawHash != Ref.ValueContentId)
{
return Request.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1247,8 +1259,9 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
{
if (Attachment->IsCompressedBinary())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
ValidAttachments.emplace_back(ValueHash);
@@ -1448,7 +1461,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
{
ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
Value.Exists = true;
}
else
@@ -1560,7 +1573,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
Value.Exists = true;
if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -1607,7 +1620,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
{
if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
{
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash())));
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
}
}
@@ -1637,13 +1650,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
else
{
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- Request.RecordObject ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ ZEN_INFO("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ Request.RecordObject ? " (PARTIAL)"sv : ""sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
m_CacheStats.MissCount++;
}
}
@@ -1839,25 +1852,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
PolicyText = RequestObject["Policy"sv].AsString();
Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
- CompressedBuffer& Result = Request.Result;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize);
}
}
- if (Result)
+ if (Request.Result)
{
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
*Namespace,
Key.Bucket,
Key.Hash,
- NiceBytes(Result.GetCompressed().GetSize()),
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
"LOCAL"sv,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
@@ -1873,12 +1885,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_INFO("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
}
}
@@ -1916,7 +1928,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
{
if (HasData && !SkipData)
{
- Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize);
}
if (HasData && StoreData)
@@ -1961,15 +1973,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
const CompressedBuffer& Result = Request.Result;
if (Result)
{
- IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash());
- ResponseObject.AddHash("RawHash"sv, Hash);
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Result, Hash));
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
}
else
{
- ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
}
}
else if (Request.RawHash != IoHash::Zero)
@@ -2026,12 +2037,12 @@ namespace cache::detail {
RecordBody* Record = nullptr;
CompressedBuffer Value;
const UpstreamEndpointInfo* Source = nullptr;
- uint64_t TotalSize = 0;
+ uint64_t RawSize = 0;
uint64_t RequestedSize = 0;
uint64_t RequestedOffset = 0;
CachePolicy DownstreamPolicy;
bool Exists = false;
- bool TotalSizeKnown = false;
+ bool RawSizeKnown = false;
bool IsRecordRequest = false;
uint64_t ElapsedTimeUs = 0;
};
@@ -2303,9 +2314,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (Value.ValueId == Request->Key->ValueId)
{
- Request->Key->ChunkId = Value.ContentId;
- Request->TotalSize = Value.RawSize;
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
break;
}
}
@@ -2316,7 +2327,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
@@ -2325,16 +2336,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
}
else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
if (Compressed)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Compressed;
}
- Request->Exists = true;
- Request->TotalSize = Compressed.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Exists = true;
+ Request->RawSize = Compressed.DecodeRawSize();
+ Request->RawSizeKnown = true;
}
}
}
@@ -2365,17 +2376,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize);
if (Result)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Result;
}
- Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
- Request->Exists = true;
- Request->TotalSize = Result.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = RawHash;
+ Request->Exists = true;
+ Request->RawSize = RawSize;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2417,9 +2430,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
- IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize);
+ if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash)
{
return;
}
@@ -2428,7 +2442,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
{
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash);
}
else
{
@@ -2440,11 +2454,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
Request.Value = std::move(Compressed);
}
}
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.TotalSize = Params.RawSize;
- Request.TotalSizeKnown = true;
- Request.Source = Params.Source;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
m_CacheStats.UpstreamHitCount++;
};
@@ -2482,7 +2496,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
}
else
{
- Writer.AddInteger("RawSize"sv, Request.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
}
ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
@@ -2490,7 +2504,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
- NiceBytes(Request.TotalSize),
+ NiceBytes(Request.RawSize),
Request.IsRecordRequest ? "Record"sv : "Value"sv,
Request.Source ? Request.Source->Url : "LOCAL"sv,
NiceLatencyNs(Request.ElapsedTimeUs * 1000));
@@ -2507,12 +2521,12 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
}
else
{
- ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ ZEN_INFO("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
m_CacheStats.MissCount++;
}
}