aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/structuredcache.cpp121
-rw-r--r--zenserver/cache/structuredcachestore.cpp47
2 files changed, 91 insertions, 77 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index baaf94dd0..f649efa01 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -747,8 +747,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash()));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
}
else
{
@@ -907,7 +907,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
{
Package.AddAttachment(CbAttachment(Compressed, Hash));
@@ -939,8 +939,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1173,7 +1174,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1243,10 +1244,19 @@ HttpStructuredCacheService::HandleGetCacheChunk(zen::HttpServerRequest& Request,
m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
UpstreamResult.Status.Success)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
{
- m_CidStore.AddChunk(Compressed);
- Source = UpstreamResult.Source;
+ if (RawHash == Ref.ValueContentId)
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ Source = UpstreamResult.Source;
+ }
+ else
+ {
+ ZEN_WARN("got missmatching upstream cache value");
+ }
}
else
{
@@ -1312,21 +1322,21 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Body.SetContentType(Request.RequestContentType());
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
-
- if (!Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
{
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
+ if (RawHash != Ref.ValueContentId)
{
return Request.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1625,7 +1635,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
if (InsertResult.New)
{
Count.New++;
@@ -1788,7 +1798,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
{
ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
Value.Exists = true;
}
else
@@ -1900,7 +1910,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Value.Exists = true;
if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -1947,7 +1957,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
{
if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
{
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash())));
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
}
}
@@ -2139,25 +2149,24 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
PolicyText = RequestObject["Policy"sv].AsString();
Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
- CompressedBuffer& Result = Request.Result;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), Request.RawHash, Request.RawSize);
}
}
- if (Result)
+ if (Request.Result)
{
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
*Namespace,
Key.Bucket,
Key.Hash,
- NiceBytes(Result.GetCompressed().GetSize()),
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
"LOCAL"sv,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
@@ -2216,7 +2225,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
{
if (HasData && !SkipData)
{
- Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), Request.RawHash, Request.RawSize);
}
if (HasData && StoreData)
@@ -2261,15 +2270,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
const CompressedBuffer& Result = Request.Result;
if (Result)
{
- IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash());
- ResponseObject.AddHash("RawHash"sv, Hash);
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Result, Hash));
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
}
else
{
- ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
}
}
else if (Request.RawHash != IoHash::Zero)
@@ -2310,12 +2318,12 @@ namespace cache::detail {
RecordBody* Record = nullptr;
CompressedBuffer Value;
const UpstreamEndpointInfo* Source = nullptr;
- uint64_t TotalSize = 0;
+ uint64_t RawSize = 0;
uint64_t RequestedSize = 0;
uint64_t RequestedOffset = 0;
CachePolicy DownstreamPolicy;
bool Exists = false;
- bool TotalSizeKnown = false;
+ bool RawSizeKnown = false;
bool IsRecordRequest = false;
uint64_t ElapsedTimeUs = 0;
};
@@ -2569,9 +2577,9 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (Value.ValueId == Request->Key->ValueId)
{
- Request->Key->ChunkId = Value.ContentId;
- Request->TotalSize = Value.RawSize;
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
break;
}
}
@@ -2582,7 +2590,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
@@ -2591,16 +2599,16 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
}
else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ IoHash RawHash;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, Request->RawSize);
if (Compressed)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Compressed;
}
- Request->Exists = true;
- Request->TotalSize = Compressed.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Exists = true;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2631,17 +2639,19 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value), RawHash, RawSize);
if (Result)
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
Request->Value = Result;
}
- Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
- Request->Exists = true;
- Request->TotalSize = Result.GetRawSize();
- Request->TotalSizeKnown = true;
+ Request->Key->ChunkId = RawHash;
+ Request->Exists = true;
+ Request->RawSize = RawSize;
+ Request->RawSizeKnown = true;
}
}
}
@@ -2683,9 +2693,10 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
- IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value), RawHash, RawSize);
+ if (!Compressed || RawSize != Params.RawSize || RawHash != Params.RawHash)
{
return;
}
@@ -2694,7 +2705,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
{
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Compressed);
+ m_CidStore.AddChunk(Params.Value, RawHash);
}
else
{
@@ -2706,11 +2717,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
Request.Value = std::move(Compressed);
}
}
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.TotalSize = Params.RawSize;
- Request.TotalSizeKnown = true;
- Request.Source = Params.Source;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
m_CacheStats.UpstreamHitCount++;
};
@@ -2741,7 +2752,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
}
else
{
- Writer.AddInteger("RawSize"sv, Request.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
}
ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
@@ -2749,7 +2760,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespa
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
- NiceBytes(Request.TotalSize),
+ NiceBytes(Request.RawSize),
Request.IsRecordRequest ? "Record"sv : "Value"sv,
Request.Source ? Request.Source->Url : "LOCAL"sv,
NiceLatencyNs(Request.ElapsedTimeUs * 1000));
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 1f48aaebe..75f845cbf 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -491,7 +491,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
- auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
if (ContentType == ZenContentType::kCbObject)
{
CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
@@ -499,7 +499,13 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
if (ContentType == ZenContentType::kCompressedBinary)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (Hash != RawHash)
{
return false;
}
@@ -509,7 +515,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
for (auto& Kv : m_CacheMap)
{
- if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload))
+ if (!ValidateEntry(Kv.first, Kv.second.Payload.GetContentType(), Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -1021,7 +1027,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
- auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
if (ContentType == ZenContentType::kCbObject)
{
CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
@@ -1029,7 +1035,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
if (ContentType == ZenContentType::kCompressedBinary)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (RawHash != Hash)
{
return false;
}
@@ -1077,7 +1089,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
BadKeys.push_back(HashKey);
continue;
}
- if (!ValidateEntry(Loc.GetContentType(), Value.Value))
+ if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value))
{
BadKeys.push_back(HashKey);
continue;
@@ -1108,7 +1120,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
return;
}
ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType();
- if (!ValidateEntry(ContentType, Buffer))
+ if (!ValidateEntry(Hash, ContentType, Buffer))
{
BadKeys.push_back(Hash);
return;
@@ -1127,7 +1139,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
return;
}
ZenContentType ContentType = m_Index.at(Hash).Location.GetContentType();
- if (!ValidateEntry(ContentType, Buffer))
+ if (!ValidateEntry(Hash, ContentType, Buffer))
{
BadKeys.push_back(Hash);
return;
@@ -1678,18 +1690,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
if (Ec)
{
- std::filesystem::path ParentPath = FsPath.parent_path();
- if (!std::filesystem::is_directory(ParentPath))
- {
- Ec.clear();
- std::filesystem::create_directories(ParentPath, Ec);
- if (Ec)
- {
- throw std::system_error(
- Ec,
- fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
- }
- }
+ CreateDirectories(FsPath.parent_path());
+ Ec.clear();
// Try again
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
@@ -3100,7 +3102,8 @@ TEST_CASE("z$.scrub")
{
IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash()));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index),
+ IoHash::FromBLAKE3(CompressedAttachmentData.DecodeRawHash()));
Result.Attachments[Index] = CompressedAttachmentData;
}
Result.Record = Record.Save().GetBuffer().AsIoBuffer();
@@ -3142,7 +3145,7 @@ TEST_CASE("z$.scrub")
Zcs.Put("mybucket", Cid, {.Value = Record.Record});
for (const CompressedBuffer& Attachment : Record.Attachments)
{
- CidStore.AddChunk(Attachment);
+ CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), IoHash::FromBLAKE3(Attachment.DecodeRawHash()));
}
}
};