diff options
| author | Per Larsson <[email protected]> | 2021-09-21 15:10:39 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-09-21 15:10:39 +0200 |
| commit | 9a92f2e73fb186d1fce69c5d784ba441e39fb9ec (patch) | |
| tree | 1f74cd00751fe1c35f7a02446e4f61e8be727e68 /zenserver/cache/structuredcache.cpp | |
| parent | Clang format fix. (diff) | |
| download | zen-9a92f2e73fb186d1fce69c5d784ba441e39fb9ec.tar.xz zen-9a92f2e73fb186d1fce69c5d784ba441e39fb9ec.zip | |
Refactored out get/set cache record.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 644 |
1 files changed, 297 insertions, 347 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 3743bdc53..87ec10520 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -244,7 +244,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { @@ -253,425 +253,375 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, case kHead: case kGet: { - const ZenContentType AcceptType = Request.AcceptContentType(); + HandleGetCacheRecord(Request, Ref, Policy); + if (Verb == kHead) + { + Request.SetSuppressResponseBody(); + } + } + break; + case kPut: + HandlePutCacheRecord(Request, Ref, Policy); + break; + default: + break; + } +} - ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - bool InUpstreamCache = false; +void +HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + const ZenContentType AcceptType = Request.AcceptContentType(); - const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + ZenCacheValue Value; + bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + bool InUpstreamCache = false; - if (QueryUpstream) - { - const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary - : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage - : ZenContentType::kCbObject; + const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); - if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); - UpstreamResult.Success) - { - Value.Value = UpstreamResult.Value; - Success = true; - InUpstreamCache = true; + if (QueryUpstream) + { + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; - if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) - { - if (CacheRecordType == ZenContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) - { - CbObjectView CacheRecord(UpstreamResult.Value.Data()); - - CbObjectWriter IndexData; - IndexData.BeginArray("references"); - CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); - IndexData.EndArray(); - - Value.IndexData = IndexData.Save(); - } - else - { - Success = false; - ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", - Ref.BucketSegment, - Ref.HashKey); - } - } + if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + UpstreamResult.Success) + { + Value.Value = UpstreamResult.Value; + Success = true; + InUpstreamCache = true; - if (Success) - { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - } - } - else - { - ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) + { + if (CacheRecordType == ZenContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - CbPackage Package; - if (Package.TryLoad(UpstreamResult.Value)) - { - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - CbObject CacheRecord = Package.GetObject(); - - CacheRecord.IterateAttachments( - [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) - { - if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) - { - m_CidStore.AddChunk(Chunk); - FoundCount++; - } - else - { - ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", - Ref.BucketSegment, - Ref.HashKey); - } - } - AttachmentCount++; - }); - - if (FoundCount == AttachmentCount) - { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - - if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) - { - CbPackage PackageWithoutAttachments; - PackageWithoutAttachments.SetObject(CacheRecord); - - MemoryOutStream MemStream; - BinaryWriter Writer(MemStream); - PackageWithoutAttachments.Save(Writer); - - Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - } - } - else - { - Success = false; - ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", - Ref.BucketSegment, - Ref.HashKey); - } - } - else - { - Success = false; - ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); - } - } - } - } + if (ValidationResult == CbValidateError::None) + { + CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!Success) - { - ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); + CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); - return Request.WriteResponse(HttpResponseCode::NotFound); + Value.IndexData = IndexData.Save(); + } + else + { + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); + } } - if (Verb == kHead) + if (Success) { - Request.SetSuppressResponseBody(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } + } + else + { + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); - if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) { - CbObjectView CacheRecord(Value.Value.Data()); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + CbObject CacheRecord = Package.GetObject(); - const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); + CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + ValidCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); - if (ValidationResult != CbValidateError::None) + if (ValidCount == AttachmentCount) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); - - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); - } + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - uint64_t AttachmentBytes = 0ull; - - CbPackage Package; - - if (!SkipAttachments) - { - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - FoundCount++; - } - AttachmentCount++; - }); - - if (FoundCount != AttachmentCount) + if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - FoundCount, - AttachmentCount); + CbPackage PackageWithoutAttachments; + PackageWithoutAttachments.SetObject(CacheRecord); - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + PackageWithoutAttachments.Save(Writer); + + Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } - - Package.SetObject(LoadCompactBinaryObject(Value.Value)); - - ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(AttachmentBytes + Value.Value.Size()), - AttachmentCount, - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - MemoryOutStream MemStream; - BinaryWriter Writer(MemStream); - Package.Save(Writer); - - IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } } else { - ZEN_DEBUG("HIT - '{}/{}' {} ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Value.Value.Size()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); } } - break; + } + } - case kPut: - { - IoBuffer Body = Request.ReadPayload(); + if (!Success) + { + ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + return Request.WriteResponse(HttpResponseCode::NotFound); + } - const HttpContentType ContentType = Request.RequestContentType(); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); - const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); - if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) - { - // TODO: create a cache record and put value in CAS? - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } - if (StoreUpstream) + const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + uint64_t AttachmentBytes = 0ull; + + CbPackage Package; + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - auto Result = m_UpstreamCache->EnqueueUpstream( - {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + ValidCount++; } + AttachmentCount++; + }); - return Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbObject) - { - // Validate payload before accessing it - const CbValidateError ValidationResult = - ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All); + if (ValidCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + ValidCount, + AttachmentCount); - if (ValidationResult != CbValidateError::None) - { - ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", - Ref.BucketSegment, - Ref.HashKey, - Body.Size()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + } - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + Package.SetObject(LoadCompactBinaryObject(Value.Value)); - // Extract referenced payload hashes - CbObjectView Cbo(Body.Data()); + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments (LOCAL)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount); - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - if (!References.empty()) - { - CbObjectWriter Idx; - Idx.BeginArray("references"); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) - { - MissingRefs.push_back(Hash); - } - } + Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } +} - Idx.EndArray(); +void +HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + IoBuffer Body = Request.ReadPayload(); - CacheValue.IndexData = Idx.Save(); - } + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); - ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(CacheValue.Value.Size()), - MissingRefs.size(), - References.size()); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (MissingRefs.empty() && StoreUpstream) - { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + } - return Request.WriteResponse(HttpResponseCode::Created); - } - else - { - // TODO: Binary attachments? - CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); - // Return Created | BadRequest? - return Request.WriteResponse(HttpResponseCode::Created, Response.Save()); - } - } - else if (ContentType == HttpContentType::kCbPackage) - { - CbPackage Package; + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, Body.Size()); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } - if (!Package.TryLoad(Body)) - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); - } + CbObjectView CacheRecord(Body.Data()); + std::vector<IoHash> ValidAttachments; + uint32_t AttachmentCount = 0; - CbObject CacheRecord = Package.GetObject(); + CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + AttachmentCount++; + }); - struct AttachmentInsertResult - { - int32_t Count = 0; - int32_t NewCount = 0; - uint64_t Bytes = 0; - uint64_t NewBytes = 0; - bool Ok = false; - }; + const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size()); + const bool ValidCacheRecord = ValidCount == AttachmentCount; - AttachmentInsertResult AttachmentResult{.Ok = true}; - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - std::vector<IoHash> PayloadIds; + if (ValidCacheRecord) + { + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {} attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ValidCount); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - PayloadIds.reserve(Attachments.size()); + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); + } - CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) - { - if (Attachment->IsCompressedBinary()) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, found {}/{} attachments", + Ref.BucketSegment, + Ref.HashKey, + ValidCount, + AttachmentCount); - PayloadIds.emplace_back(InsertResult.DecompressedId); + Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv); + } + } + else if (ContentType == HttpContentType::kCbPackage) + { + CbPackage Package; - if (InsertResult.New) - { - AttachmentResult.NewBytes += ChunkSize; - AttachmentResult.NewCount++; - } + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); + } - AttachmentResult.Bytes += ChunkSize; - AttachmentResult.Count++; - } - else - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", - Ref.BucketSegment, - Ref.HashKey, - AttachmentHash.AsHash()); - AttachmentResult.Ok = false; - } - } - else - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", - Ref.BucketSegment, - Ref.HashKey, - AttachmentHash.AsHash()); - AttachmentResult.Ok = false; - } - }); + CbObject CacheRecord = Package.GetObject(); - if (!AttachmentResult.Ok) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); - } + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + std::vector<IoHash> ValidAttachments; + int32_t NewAttachmentCount = 0; + + ValidAttachments.reserve(Attachments.size()); - IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); - const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size(); + CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); - ZenCacheValue CacheValue{.Value = CacheRecordChunk}; - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + ValidAttachments.emplace_back(InsertResult.DecompressedId); - if (StoreUpstream) + if (InsertResult.New) { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(PayloadIds)}); + NewAttachmentCount++; } - - ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(TotalPackageBytes), - AttachmentResult.NewCount, - AttachmentResult.Count, - NiceBytes(AttachmentResult.NewBytes), - NiceBytes(AttachmentResult.Bytes)); - - return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(HttpResponseCode::BadRequest); + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); } } - break; + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + } + }); - case kPost: - break; + const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); - default: - break; + if (!AttachmentsValid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + } + + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} new attachments", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.GetSize()), + NewAttachmentCount, + Attachments.size()); + + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + Request.WriteResponse(HttpResponseCode::BadRequest); } } |