diff options
| author | Per Larsson <[email protected]> | 2021-09-16 16:21:16 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-09-16 16:21:16 +0200 |
| commit | b166d4081655c4c181ed915ec5475ed535c67a9d (patch) | |
| tree | 2a7d6dd512dc3cfb106442f656bdc65a2650dcac /zenserver/cache/structuredcache.cpp | |
| parent | clang-format fixes (diff) | |
| download | zen-b166d4081655c4c181ed915ec5475ed535c67a9d.tar.xz zen-b166d4081655c4c181ed915ec5475ed535c67a9d.zip | |
Compact binary package caching support (#9)
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 430 |
1 files changed, 314 insertions, 116 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9600c5f8a..cf7deaa93 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1,10 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -15,6 +17,8 @@ #include "upstream/zen.h" #include "zenstore/cidstore.h" +#include <zencore/compactbinarypackage.h> + #include <algorithm> #include <atomic> #include <filesystem> @@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { - // m_Log.set_level(spdlog::level::debug); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kHead: case kGet: { + const ZenContentType AcceptType = Request.AcceptContentType(); + ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; if (!Success && m_UpstreamCache) { - const ZenContentType CacheRecordType = - Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) @@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Success = true; InUpstreamCache = true; - if (CacheRecordType == ZenContentType::kCbObject) + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), - zen::CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) + if (CacheRecordType == ZenContentType::kCbObject) { - zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!References.empty()) + zen::CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); + + Value.IndexData = IndexData.Save(); + } + else { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } - Idx.EndArray(); - - Value.IndexData = Idx.Save(); + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); } } - else + + if (Success) { - Value.Value = IoBuffer(); - Success = false; - ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } - - if (Success) + else { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) + { + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + CbObject CacheRecord = Package.GetObject(); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + FoundCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); + + if (FoundCount == AttachmentCount) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); + } } } } @@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); + + const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } + + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; + + CbPackage Package; + + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + + Package.SetObject(LoadCompactBinaryObject(Value.Value)); + + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount, + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } } break; @@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary = false; - - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; - - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; - - default: - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); - } - - if (!IsCompactBinary) + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType()); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); if (m_UpstreamCache) { @@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req return Request.WriteResponse(zen::HttpResponseCode::Created); } - - // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + else if (ContentType == HttpContentType::kCbObject) { - ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", + Ref.BucketSegment, + Ref.HashKey, + Body.Size()); - // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary validation failed"sv); + } - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - for (const IoHash& Hash : References) + if (!References.empty()) { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + + for (const IoHash& Hash : References) { - MissingRefs.push_back(Hash); + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) + { + MissingRefs.push_back(Hash); + } } + + Idx.EndArray(); + + CacheValue.IndexData = Idx.Save(); } - Idx.EndArray(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - CacheValue.IndexData = Idx.Save(); - } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(CacheValue.Value.Size()), + MissingRefs.size(), + References.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + } - ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", - Ref.BucketSegment, - Ref.HashKey, - CacheValue.Value.Size(), - CacheValue.Value.GetContentType(), - References.size(), - MissingRefs.size()); + return Request.WriteResponse(zen::HttpResponseCode::Created); + } + else + { + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); - if (MissingRefs.empty()) + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + } + } + else if (ContentType == HttpContentType::kCbPackage) { - // Only enqueue valid cache records, i.e. all referenced payloads exists + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + CbObject CacheRecord = Package.GetObject(); + + int32_t AttachmentCount = 0; + int32_t NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + bool AttachmentsOk = true; + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + + std::vector<IoHash> PayloadIds; + PayloadIds.reserve(Attachments.size()); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &AttachmentsOk, + &AttachmentCount, + &TotalAttachmentBytes, + &TotalNewBytes, + &NewAttachmentCount, + &PayloadIds](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + PayloadIds.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + TotalNewBytes += ChunkSize; + ++NewAttachmentCount; + } + + TotalAttachmentBytes += ChunkSize; + AttachmentCount++; + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + }); + + if (!AttachmentsOk) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); + } + + IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); + const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + + ZenCacheValue CacheValue{.Value = CacheRecordChunk}; + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (m_UpstreamCache) { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + .PayloadIds = std::move(PayloadIds)}); } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(TotalPackageBytes), + NewAttachmentCount, + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + zen::NiceBytes(TotalAttachmentBytes)); + return Request.WriteResponse(zen::HttpResponseCode::Created); } else { - // TODO: Binary attachments? - zen::CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); - - // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(zen::HttpResponseCode::BadRequest); } } break; @@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(zen::HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Payload.Size(), + NiceBytes(Payload.Size()), Payload.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + InUpstreamCache ? "UPSTREAM" : "LOCAL"); if (Verb == kHead) { @@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})", - Result.New ? "NEW" : "OLD", + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Body.Size(), - Body.GetContentType()); + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); if (Result.New) { |