diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-02 10:53:15 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-02 10:53:15 +0200 |
| commit | 1bafcb32cb48b2256a9d72995388b7df72058039 (patch) | |
| tree | 2385ec3de36c4f45018832eb36bcab6ac2d3670f /src/zenstore/cache/cacherpc.cpp | |
| parent | fix get project files loop (#68) (diff) | |
| download | zen-1bafcb32cb48b2256a9d72995388b7df72058039.tar.xz zen-1bafcb32cb48b2256a9d72995388b7df72058039.zip | |
batch cache put (#67)
- Improvement: Batch scope for put of cache values
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 246 |
1 files changed, 149 insertions, 97 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 60b2fa1a1..6871dfd56 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -377,7 +377,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); + m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -652,8 +652,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore - .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments); + m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + nullptr); m_CacheStats.WriteCount++; } ParseValues(Request); @@ -799,94 +804,139 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> Results; - for (CbFieldView RequestField : RequestsArray) + std::vector<bool> BatchResults; + std::vector<size_t> BatchResultIndexes; + std::vector<bool> Results; + std::vector<CacheKey> UpstreamCacheKeys; { - ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request"); + Results.reserve(RequestsArray.Num()); + ZenCacheStore::PutBatch Batch(m_CacheStore, *Namespace, BatchResults); + for (CbFieldView RequestField : RequestsArray) + { + ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request"); - m_CacheStats.RpcValueBatchRequests.fetch_add(1); + m_CacheStats.RpcValueBatchRequests.fetch_add(1); - Stopwatch Timer; + Stopwatch Timer; - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } - PolicyText = RequestObject["Policy"sv].AsString(); - CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); - uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - bool Succeeded = false; - uint64_t TransferredSize = 0; + PolicyText = RequestObject["Policy"sv].AsString(); + CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); + uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + bool Valid = false; + uint64_t TransferredSize = 0; - if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) - { - if (Attachment->IsCompressedBinary()) + if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + if (Attachment->IsCompressedBinary()) { - // TODO: Implement upstream puts of CacheValues with StoreLocal == false. - // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. - Policy |= CachePolicy::StoreLocal; - } + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + // TODO: Implement upstream puts of CacheValues with StoreLocal == false. + // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. + Policy |= CachePolicy::StoreLocal; + } - if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); - Value.SetContentType(ZenContentType::kCompressedBinary); - if (RawSize == 0) + if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - RawSize = Chunk.DecodeRawSize(); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Value.SetContentType(ZenContentType::kCompressedBinary); + if (RawSize == 0) + { + RawSize = Chunk.DecodeRawSize(); + } + m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + &Batch); + m_CacheStats.WriteCount++; + TransferredSize = Chunk.GetCompressedSize(); + BatchResultIndexes.push_back(Results.size()); + Results.push_back(false); } - m_CacheStore - .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {}); - m_CacheStats.WriteCount++; - TransferredSize = Chunk.GetCompressedSize(); + else + { + Results.push_back(true); + } + Valid = true; + } + else + { + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); + return CbPackage{}; } - Succeeded = true; } - else + else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); - return CbPackage{}; + ZenCacheValue ExistingValue; + if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && + IsCompressedBinary(ExistingValue.Value.GetContentType())) + { + Results.push_back(true); + Valid = true; + } + else + { + Results.push_back(false); + } } - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - ZenCacheValue ExistingValue; - if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && - IsCompressedBinary(ExistingValue.Value.GetContentType())) + // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. + // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. + + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - Succeeded = true; + UpstreamCacheKeys.push_back(Key); + } + else + { + UpstreamCacheKeys.push_back(CacheKey::Empty); } - } - // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. - // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. - if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(TransferredSize), + Valid ? "Added"sv : "Invalid", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } - Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(TransferredSize), - Succeeded ? "Added"sv : "Invalid", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } if (Results.empty()) { return CbPackage{}; } + ZEN_ASSERT(UpstreamCacheKeys.size() == Results.size()); + ZEN_ASSERT(BatchResults.size() == BatchResultIndexes.size()); + for (size_t Index = 0; Index < BatchResults.size(); Index++) + { + size_t BatchResultIndex = BatchResultIndexes[Index]; + ZEN_ASSERT(BatchResultIndex < Results.size()); + ZEN_ASSERT(Results[BatchResultIndex] == false); + Results[BatchResultIndex] = BatchResults[Index]; + } + + for (std::size_t Index = 0; Index < Results.size(); Index++) + { + if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + } + } { ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject; @@ -1046,7 +1096,8 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; } @@ -1314,36 +1365,36 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = - [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector<IoHash> ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( + CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + std::vector<IoHash> ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1531,7 +1582,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; } } |