aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp430
1 files changed, 314 insertions, 116 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 9600c5f8a..cf7deaa93 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1,10 +1,12 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
@@ -15,6 +17,8 @@
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
+#include <zencore/compactbinarypackage.h>
+
#include <algorithm>
#include <atomic>
#include <filesystem>
@@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
- // m_Log.set_level(spdlog::level::debug);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kHead:
case kGet:
{
+ const ZenContentType AcceptType = Request.AcceptContentType();
+
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
bool InUpstreamCache = false;
if (!Success && m_UpstreamCache)
{
- const ZenContentType CacheRecordType =
- Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject;
+ const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
+ : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
+ : ZenContentType::kCbObject;
if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
UpstreamResult.Success)
@@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Success = true;
InUpstreamCache = true;
- if (CacheRecordType == ZenContentType::kCbObject)
+ if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
{
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()),
- zen::CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
+ if (CacheRecordType == ZenContentType::kCbObject)
{
- zen::CbObjectView Cbo(UpstreamResult.Value.Data());
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All);
- std::vector<IoHash> References;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ if (ValidationResult == CbValidateError::None)
+ {
+ zen::CbObjectView CacheRecord(UpstreamResult.Value.Data());
- if (!References.empty())
+ zen::CbObjectWriter IndexData;
+ IndexData.BeginArray("references");
+ CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
+ IndexData.EndArray();
+
+ Value.IndexData = IndexData.Save();
+ }
+ else
{
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- }
- Idx.EndArray();
-
- Value.IndexData = Idx.Save();
+ Success = false;
+ ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
+ Ref.BucketSegment,
+ Ref.HashKey);
}
}
- else
+
+ if (Success)
{
- Value.Value = IoBuffer();
- Success = false;
- ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
}
}
-
- if (Success)
+ else
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
+
+ CbPackage Package;
+ if (Package.TryLoad(UpstreamResult.Value))
+ {
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ CbObject CacheRecord = Package.GetObject();
+
+ CacheRecord.IterateAttachments(
+ [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+ FoundCount++;
+ }
+ else
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount == AttachmentCount)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
+ }
}
}
}
@@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- Value.Value.Size(),
- Value.Value.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ {
+ CbObjectView CacheRecord(Value.Value.Data());
+
+ const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All);
+
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ }
+
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ uint64_t AttachmentBytes = 0ull;
+
+ CbPackage Package;
+
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ FoundCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ FoundCount,
+ AttachmentCount);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
+
+ Package.SetObject(LoadCompactBinaryObject(Value.Value));
+
+ ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(AttachmentBytes + Value.Value.Size()),
+ AttachmentCount,
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ }
+ else
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Value.Value.Size()),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ }
}
break;
@@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
const HttpContentType ContentType = Request.RequestContentType();
- bool IsCompactBinary = false;
-
- switch (ContentType)
- {
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kBinary:
- IsCompactBinary = false;
- break;
-
- case HttpContentType::kCbObject:
- IsCompactBinary = true;
- break;
-
- default:
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
- }
-
- if (!IsCompactBinary)
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
{
// TODO: create a cache record and put value in CAS?
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType());
+ ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
if (m_UpstreamCache)
{
@@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
-
- // Validate payload before accessing it
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
+ else if (ContentType == HttpContentType::kCbObject)
{
- ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
+ // Validate payload before accessing it
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
- // TODO: add details in response, kText || kCbObject?
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Compact binary validation failed"sv);
- }
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Body.Size());
- // Extract referenced payload hashes
- zen::CbObjectView Cbo(Body.Data());
+ // TODO: add details in response, kText || kCbObject?
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Compact binary validation failed"sv);
+ }
- std::vector<IoHash> References;
- std::vector<IoHash> MissingRefs;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ // Extract referenced payload hashes
+ zen::CbObjectView Cbo(Body.Data());
- ZenCacheValue CacheValue;
- CacheValue.Value = Body;
+ std::vector<IoHash> References;
+ std::vector<IoHash> MissingRefs;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
- if (!References.empty())
- {
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
+ ZenCacheValue CacheValue;
+ CacheValue.Value = Body;
- for (const IoHash& Hash : References)
+ if (!References.empty())
{
- Idx.AddHash(Hash);
- if (!m_CidStore.ContainsChunk(Hash))
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
+
+ for (const IoHash& Hash : References)
{
- MissingRefs.push_back(Hash);
+ Idx.AddHash(Hash);
+ if (!m_CidStore.ContainsChunk(Hash))
+ {
+ MissingRefs.push_back(Hash);
+ }
}
+
+ Idx.EndArray();
+
+ CacheValue.IndexData = Idx.Save();
}
- Idx.EndArray();
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- CacheValue.IndexData = Idx.Save();
- }
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(CacheValue.Value.Size()),
+ MissingRefs.size(),
+ References.size());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ if (MissingRefs.empty())
+ {
+ // Only enqueue valid cache records, i.e. all referenced payloads exists
+ if (m_UpstreamCache)
+ {
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
+ }
- ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))",
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue.Value.Size(),
- CacheValue.Value.GetContentType(),
- References.size(),
- MissingRefs.size());
+ return Request.WriteResponse(zen::HttpResponseCode::Created);
+ }
+ else
+ {
+ // TODO: Binary attachments?
+ zen::CbObjectWriter Response;
+ Response.BeginArray("needs");
+ for (const IoHash& MissingRef : MissingRefs)
+ {
+ Response.AddHash(MissingRef);
+ ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
+ }
+ Response.EndArray();
- if (MissingRefs.empty())
+ // Return Created | BadRequest?
+ return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ }
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
{
- // Only enqueue valid cache records, i.e. all referenced payloads exists
+ CbPackage Package;
+
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
+ }
+
+ CbObject CacheRecord = Package.GetObject();
+
+ int32_t AttachmentCount = 0;
+ int32_t NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+ bool AttachmentsOk = true;
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+
+ std::vector<IoHash> PayloadIds;
+ PayloadIds.reserve(Attachments.size());
+
+ CacheRecord.IterateAttachments([this,
+ &Ref,
+ &Package,
+ &AttachmentsOk,
+ &AttachmentCount,
+ &TotalAttachmentBytes,
+ &TotalNewBytes,
+ &NewAttachmentCount,
+ &PayloadIds](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ PayloadIds.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += ChunkSize;
+ ++NewAttachmentCount;
+ }
+
+ TotalAttachmentBytes += ChunkSize;
+ AttachmentCount++;
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ });
+
+ if (!AttachmentsOk)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
+ }
+
+ IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
+ const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size();
+
+ ZenCacheValue CacheValue{.Value = CacheRecordChunk};
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+
if (m_UpstreamCache)
{
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
+ .PayloadIds = std::move(PayloadIds)});
}
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(TotalPackageBytes),
+ NewAttachmentCount,
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ zen::NiceBytes(TotalAttachmentBytes));
+
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
else
{
- // TODO: Binary attachments?
- zen::CbObjectWriter Response;
- Response.BeginArray("needs");
- for (const IoHash& MissingRef : MissingRefs)
- {
- Response.AddHash(MissingRef);
- ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
- }
- Response.EndArray();
-
- // Return Created | BadRequest?
- return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
}
}
break;
@@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
return Request.WriteResponse(zen::HttpResponseCode::NotFound);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Payload.Size(),
+ NiceBytes(Payload.Size()),
Payload.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
if (Verb == kHead)
{
@@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
- ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})",
- Result.New ? "NEW" : "OLD",
+ ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Body.Size(),
- Body.GetContentType());
+ NiceBytes(Body.Size()),
+ Body.GetContentType(),
+ Result.New ? "NEW" : "OLD");
if (Result.New)
{