diff options
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 234 |
1 files changed, 152 insertions, 82 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index de4b0a37c..5d9a68919 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -334,13 +334,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -360,7 +360,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -422,14 +422,28 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return PutResult.Status; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -466,7 +480,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -765,18 +779,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector<IoHash> ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -924,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - eastl::fixed_vector<size_t, 32> BatchResultIndexes; - eastl::fixed_vector<bool, 32> Results; - eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + std::vector<ZenCacheStore::PutResult> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; uint64_t RequestCount = RequestsArray.Num(); { @@ -977,34 +997,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(true); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1020,12 +1045,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1060,13 +1085,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); @@ -1076,11 +1101,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1239,6 +1285,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1249,14 +1296,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1527,36 +1579,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector<IoHash> ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector<IoHash> ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1774,20 +1838,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) |