diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 231 |
1 files changed, 137 insertions, 94 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index dfb69c0fe..0df7472ac 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -651,44 +651,63 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - - CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { - if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) - { - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentsToStoreLocally.reserve(NumAttachments); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) { + IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { - if (StoreLocal) + if (Attachment->IsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + if (StoreLocal) { - Count.New++; + AttachmentsToStoreLocally.emplace_back(Attachment); } + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", + Hash, + Ref.BucketSegment, + Ref.HashKey); + Count.Invalid++; } - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", - HashView.AsHash(), - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; } - } - else if (QueryLocal) - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + else if (QueryLocal) { - Package.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash())); - Count.Valid++; + if (SkipData) + { + if (m_CidStore.ContainsChunk(Hash)) + { + Count.Valid++; + } + } + else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, Hash)); + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", + Hash, + Ref.BucketSegment, + Ref.HashKey); + Count.Invalid++; + } + } } - } - Count.Total++; - }); + Count.Total++; + }); if ((Count.Valid == Count.Total) || PartialRecord) { @@ -701,6 +720,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + if (InsertResult.New) + { + Count.New++; + } + } + BinaryWriter MemStream; if (SkipData) { @@ -822,7 +851,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } - CachePolicy Policy = PolicyFromUrl; + Body.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; int32_t TotalCount = 0; @@ -846,11 +877,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ValidAttachments.size(), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); + CachePolicy Policy = PolicyFromUrl; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, @@ -876,27 +905,23 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } CachePolicy Policy = PolicyFromUrl; - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - std::vector<IoHash> ValidAttachments; + CbObject CacheRecord = Package.GetObject(); - ValidAttachments.reserve(Package.GetAttachments().size()); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + ValidAttachments.reserve(NumAttachments); + AttachmentsToStoreLocally.reserve(NumAttachments); - CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); - + AttachmentsToStoreLocally.emplace_back(Attachment); ValidAttachments.emplace_back(Hash); - - if (InsertResult.New) - { - Count.New++; - } Count.Valid++; } else @@ -923,6 +948,21 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + ZenCacheValue CacheValue; + CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + if (InsertResult.New) + { + Count.New++; + } + } + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", Ref.Namespace, Ref.BucketSegment, @@ -934,12 +974,6 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request Count.Total, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); - const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) @@ -1233,56 +1267,71 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req HttpStructuredCacheService::PutResult HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { - std::vector<IoHash> ValidAttachments; - AttachmentCount Count; - CbObjectView Record = Request.RecordObject; - uint64_t RecordObjectSize = Record.GetSize(); - uint64_t TransferredSize = RecordObjectSize; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + AttachmentCount Count; + size_t NumAttachments = Package->GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + ValidAttachments.reserve(NumAttachments); + AttachmentsToStoreLocally.reserve(NumAttachments); Stopwatch Timer; - Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) + Request.RecordObject.IterateAttachments( + [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); - - ValidAttachments.emplace_back(ValueHash); - - if (InsertResult.New) + if (Attachment->IsCompressedBinary()) { - Count.New++; + AttachmentsToStoreLocally.emplace_back(Attachment); + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + else + { + ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; } - Count.Valid++; - TransferredSize += Chunk.GetCompressedSize(); } - else + else if (m_CidStore.ContainsChunk(ValueHash)) { - ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(HttpContentType::kCbPackage), - ValueHash); - Count.Invalid++; + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; } - } - else if (m_CidStore.ContainsChunk(ValueHash)) - { - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - Count.Total++; - }); + Count.Total++; + }); if (Count.Invalid > 0) { return PutResult::Invalid; } + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + if (InsertResult.New) + { + Count.New++; + } + TransferredSize += Chunk.GetCompressedSize(); + } + ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", Request.Namespace, Request.Key.Bucket, @@ -1293,12 +1342,6 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack Count.Total, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - ZenCacheValue CacheValue; - CacheValue.Value = IoBuffer(Record.GetSize()); - Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); - const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) |