diff options
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 401 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 5 |
2 files changed, 365 insertions, 41 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 3cb1b03a5..4d9bda601 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -231,6 +231,14 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, { OutResultPackage = HandleRpcGetCacheChunks(Context, OutAcceptFlags, Object); } + else if (Method == "PutRecordAttachments"sv) + { + OutResultPackage = HandleRpcPutRecordAttachments(Context, Package); + } + else if (Method == "GetRecordAttachments"sv) + { + OutResultPackage = HandleRpcGetRecordAttachments(Context, Object); + } else { m_CacheStats.BadRequestCount++; @@ -240,6 +248,264 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, } CbPackage +CacheRpcHandler::HandleRpcGetRecordAttachments(const CacheRequestContext& Context, CbObjectView BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcGetRecordAttachments"); + ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetRecordAttachments"sv); + m_CacheStats.RpcRecordRequests.fetch_add(1); + + CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + m_CacheStats.RpcRecordRequests.fetch_add(1); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + + std::vector<std::vector<std::pair<IoHash, CompressedBuffer>>> Results; + + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + for (CbFieldView RequestField : RequestsArray) + { + // m_CacheStats.RpcRecordBatchRequests.fetch_add(1); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } + CbArrayView ChunkHashes = RequestObject["Chunks"sv].AsArrayView(); + + ZenCacheValue Record; + if (!m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, Record)) + { + ZEN_WARN("GETRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, record expected to exist", + *Namespace, + Key.Bucket, + Key.Hash, + ToString(ZenContentType::kCbPackage)); + } + else + { + Record = {}; + + Stopwatch Timer; + + bool SkipData = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::SkipData); + uint64_t TransferredSize = 0; + std::vector<std::pair<IoHash, CompressedBuffer>> ChunkResults; + size_t FoundCount = 0; + for (CbFieldView ChunkHashView : ChunkHashes) + { + IoHash AttachmentHash = ChunkHashView.AsHash(); + if (SkipData) + { + if (m_CidStore.ContainsChunk(AttachmentHash)) + { + ChunkResults.push_back(std::make_pair(AttachmentHash, CompressedBuffer{})); + } + else + { + ChunkResults.push_back(std::make_pair(IoHash::Zero, CompressedBuffer{})); + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash); Chunk) + { + TransferredSize += Chunk.GetSize(); + ChunkResults.push_back( + std::make_pair(AttachmentHash, CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)))); + FoundCount++; + } + else + { + ChunkResults.push_back(std::make_pair(AttachmentHash, CompressedBuffer{})); + } + } + } + + // TODO: true/false based on Valid vs Total and policy for partials? + Results.push_back(ChunkResults); + + ZEN_DEBUG("GETRECORDATTACHMENTS - '{}' {}, attachments '{}/{}/{}' (total/found/missing) in {}", + *Namespace, + NiceBytes(TransferredSize), + ChunkHashes.Num(), + FoundCount, + ChunkHashes.Num() - FoundCount, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } + } + + if (Results.empty()) + { + return CbPackage{}; + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (const std::vector<std::pair<IoHash, CompressedBuffer>>& Result : Results) + { + ResponseObject.BeginObject(); + ResponseObject.BeginArray("Chunks"sv); + for (const auto& Chunk : Result) + { + ResponseObject.AddHash(Chunk.first); + if (Chunk.first != IoHash::Zero) + { + RpcResponse.AddAttachment(CbAttachment(Chunk.second, Chunk.first)); + } + } + ResponseObject.EndArray(); // Chunks + ResponseObject.EndObject(); + } + ResponseObject.EndArray(); // Result + + RpcResponse.SetObject(ResponseObject.Save()); + return RpcResponse; +} + +CbPackage +CacheRpcHandler::HandleRpcPutRecordAttachments(const CacheRequestContext& Context, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutRecordAttachments"); + + CbObjectView BatchObject = BatchRequest.GetObject(); + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutRecordAttachments"sv); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + // m_CacheStats.RpcRecordRequests.fetch_add(1); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + + std::vector<std::vector<bool>> Results; + + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + for (CbFieldView RequestField : RequestsArray) + { + // m_CacheStats.RpcRecordBatchRequests.fetch_add(1); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + ZEN_UNUSED(Policy); + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } + CbArrayView ChunkHashes = RequestObject["Chunks"sv].AsArrayView(); + + ZenCacheValue Record; + if (!m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, Record)) + { + ZEN_WARN("PUTRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, record expected to exist, {} attachments dropped", + *Namespace, + Key.Bucket, + Key.Hash, + ToString(ZenContentType::kCbPackage), + ChunkHashes.Num()); + Results.push_back({}); + } + else + { + Record = {}; + + Stopwatch Timer; + + AttachmentCount Count; + Count.Total = gsl::narrow<uint32_t>(ChunkHashes.Num()); + + uint64_t TransferredSize = 0; + + std::vector<bool> ChunkResults; + for (CbFieldView ChunkHashView : ChunkHashes) + { + IoHash AttachmentHash = ChunkHashView.AsHash(); + + const CbAttachment* Attachment = BatchRequest.FindAttachment(AttachmentHash); + if (Attachment && Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentHash); + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + ChunkResults.push_back(true); + } + else + { + ZEN_WARN("PUTRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + *Namespace, + Key.Bucket, + Key.Hash, + ToString(ZenContentType::kCbPackage), + AttachmentHash); + Count.Invalid++; + ChunkResults.push_back(false); + } + } + + // TODO: true/false based on Valid vs Total and policy for partials? + Results.push_back(std::move(ChunkResults)); + + ZEN_DEBUG("PUTRECORDATTACHMENTS - '{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", + *Namespace, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } + } + + if (Results.empty()) + { + return CbPackage{}; + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (const auto& Result : Results) + { + ResponseObject.BeginObject(); + { + ResponseObject.BeginArray("Chunks"); + for (bool Success : Result) + { + ResponseObject.AddBool(Success); + } + ResponseObject.EndArray(); + } + ResponseObject.EndObject(); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + return RpcResponse; +} + +CbPackage CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); @@ -260,7 +526,8 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; + std::vector<bool> Results; + std::unordered_set<IoHash> Needs; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -269,6 +536,12 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + if (!KeyView) + { + // The old standard way is to store the record key in the object, but for oplog store we don't do that + // So in that case we set the ky in the request object instead + KeyView = RequestObject["Key"sv].AsObjectView(); + } CacheKey Key; if (!GetRpcRequestCacheKey(KeyView, Key)) @@ -282,13 +555,14 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); - + std::vector<IoHash> MissingAttachments; + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest, MissingAttachments); if (Result == PutResult::Invalid) { return CbPackage{}; } Results.push_back(Result == PutResult::Success); + Needs.insert(MissingAttachments.begin(), MissingAttachments.end()); } if (Results.empty()) { @@ -297,19 +571,31 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + for (bool Success : Results) { - ResponseObject.AddBool(Value); + ResponseObject.AddBool(Success); } ResponseObject.EndArray(); + if (!Needs.empty()) + { + ResponseObject.BeginArray("needs"sv); + { + for (const IoHash& RawHash : Needs) + { + ResponseObject.AddHash(RawHash); + } + } + ResponseObject.EndArray(); // needs + } + CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); return RpcResponse; } PutResult -CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package, std::vector<IoHash>& OutMissingAttachments) { CbObjectView Record = Request.RecordObject; uint64_t RecordObjectSize = Record.GetSize(); @@ -327,49 +613,83 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; - Request.RecordObject.IterateAttachments( - [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize]( - CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(ZenContentType::kCbPackage), - ValueHash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(ValueHash)) + ZenCacheValue FetchedCacheValue; + if (!Record) + { + if (!m_CacheStore.Get(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, FetchedCacheValue)) + { + // TODO: Log error + m_CacheStats.MissCount++; + return PutResult::Fail; + } + m_CacheStats.HitCount++; + if (FetchedCacheValue.Value.GetContentType() != ZenContentType::kCbObject) + { + // TODO: Log error + return PutResult::Invalid; + } + else + { + Record = CbObjectView(FetchedCacheValue.Value.GetData()); + } + } + + Record.IterateAttachments([this, + &Request, + Package, + &AttachmentsToStoreLocally, + &ValidAttachments, + &ReferencedAttachments, + &OutMissingAttachments, + &Count, + &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) { + AttachmentsToStoreLocally.emplace_back(Attachment); ValidAttachments.emplace_back(ValueHash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ToString(ZenContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + else + { + OutMissingAttachments.emplace_back(ValueHash); + } + Count.Total++; + }); if (Count.Invalid > 0) { return PutResult::Invalid; } - ZenCacheValue CacheValue; - CacheValue.Value = IoBuffer(Record.GetSize()); - Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); - m_CacheStats.WriteCount++; + if (!FetchedCacheValue.Value) + { + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); + m_CacheStats.WriteCount++; + } for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { @@ -382,7 +702,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag TransferredSize += Chunk.GetCompressedSize(); } - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}/{}' (new/valid/total/missing) in {}", Request.Namespace, Request.Key.Bucket, Request.Key.Hash, @@ -390,6 +710,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.New, Count.Valid, Count.Total, + OutMissingAttachments.size(), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const bool IsPartialRecord = Count.Valid != Count.Total; diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index a98010a7c..aeed8dce4 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -106,7 +106,10 @@ private: CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + CbPackage HandleRpcGetRecordAttachments(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcPutRecordAttachments(const CacheRequestContext& Context, const CbPackage& BatchRequest); + + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package, std::vector<IoHash>& OutMissingAttachments); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, |