aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-01 21:55:17 +0100
committerGitHub <[email protected]>2022-12-01 12:55:17 -0800
commit7d448505cf8a63e9e3f4ed6d606693daa1cf584b (patch)
treee3d108e835b6cd5b8d814d0e1f077c813acf3bae /zenserver/cache/structuredcache.cpp
parent0.1.9 (diff)
downloadzen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.tar.xz
zen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.zip
Make sure we always store record/op before attachments (#195)
* Make sure we always store record/op before attachments We don't want to store attachments first - a GC operation could then remove attachments if triggered before storing record/op * zen::Latch * Use latch to wait for attachments to be stored * use zen::latch when adding attachments from project oplog import * changelog
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp231
1 files changed, 137 insertions, 94 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index dfb69c0fe..0df7472ac 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -651,44 +651,63 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CbPackage Package;
if (Package.TryLoad(ClientResultValue.Value))
{
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
-
- CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash()))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentsToStoreLocally.reserve(NumAttachments);
+
+ CacheRecord.IterateAttachments(
+ [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) {
+ IoHash Hash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
- if (StoreLocal)
+ if (Attachment->IsCompressedBinary())
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ if (StoreLocal)
{
- Count.New++;
+ AttachmentsToStoreLocally.emplace_back(Attachment);
}
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ Hash,
+ Ref.BucketSegment,
+ Ref.HashKey);
+ Count.Invalid++;
}
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
}
- }
- else if (QueryLocal)
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ else if (QueryLocal)
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash()));
- Count.Valid++;
+ if (SkipData)
+ {
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ Count.Valid++;
+ }
+ }
+ else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ if (Compressed)
+ {
+ Package.AddAttachment(CbAttachment(Compressed, Hash));
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'",
+ Hash,
+ Ref.BucketSegment,
+ Ref.HashKey);
+ Count.Invalid++;
+ }
+ }
}
- }
- Count.Total++;
- });
+ Count.Total++;
+ });
if ((Count.Valid == Count.Total) || PartialRecord)
{
@@ -701,6 +720,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
}
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+
BinaryWriter MemStream;
if (SkipData)
{
@@ -822,7 +851,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
}
- CachePolicy Policy = PolicyFromUrl;
+ Body.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
int32_t TotalCount = 0;
@@ -846,11 +877,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
ValidAttachments.size(),
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
-
const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
+ CachePolicy Policy = PolicyFromUrl;
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
@@ -876,27 +905,23 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
CachePolicy Policy = PolicyFromUrl;
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- std::vector<IoHash> ValidAttachments;
+ CbObject CacheRecord = Package.GetObject();
- ValidAttachments.reserve(Package.GetAttachments().size());
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
- CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) {
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) {
const IoHash Hash = HashView.AsHash();
if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
-
+ AttachmentsToStoreLocally.emplace_back(Attachment);
ValidAttachments.emplace_back(Hash);
-
- if (InsertResult.New)
- {
- Count.New++;
- }
Count.Valid++;
}
else
@@ -923,6 +948,21 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ ZenCacheValue CacheValue;
+ CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
Ref.Namespace,
Ref.BucketSegment,
@@ -934,12 +974,6 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
Count.Total,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
-
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
-
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
@@ -1233,56 +1267,71 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
HttpStructuredCacheService::PutResult
HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
- std::vector<IoHash> ValidAttachments;
- AttachmentCount Count;
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
Stopwatch Timer;
- Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
+ Request.RecordObject.IterateAttachments(
+ [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
-
- ValidAttachments.emplace_back(ValueHash);
-
- if (InsertResult.New)
+ if (Attachment->IsCompressedBinary())
{
- Count.New++;
+ AttachmentsToStoreLocally.emplace_back(Attachment);
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(HttpContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
}
- Count.Valid++;
- TransferredSize += Chunk.GetCompressedSize();
}
- else
+ else if (m_CidStore.ContainsChunk(ValueHash))
{
- ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
}
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
return PutResult::Invalid;
}
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+
ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
Request.Namespace,
Request.Key.Bucket,
@@ -1293,12 +1342,6 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
Count.Total,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
-
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)