aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/cache/cacherpc.cpp401
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h5
2 files changed, 365 insertions, 41 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 3cb1b03a5..4d9bda601 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -231,6 +231,14 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
{
OutResultPackage = HandleRpcGetCacheChunks(Context, OutAcceptFlags, Object);
}
+ else if (Method == "PutRecordAttachments"sv)
+ {
+ OutResultPackage = HandleRpcPutRecordAttachments(Context, Package);
+ }
+ else if (Method == "GetRecordAttachments"sv)
+ {
+ OutResultPackage = HandleRpcGetRecordAttachments(Context, Object);
+ }
else
{
m_CacheStats.BadRequestCount++;
@@ -240,6 +248,264 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
}
CbPackage
+CacheRpcHandler::HandleRpcGetRecordAttachments(const CacheRequestContext& Context, CbObjectView BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetRecordAttachments");
+ ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetRecordAttachments"sv);
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+
+ std::vector<std::vector<std::pair<IoHash, CompressedBuffer>>> Results;
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ // m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+ CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+ CbArrayView ChunkHashes = RequestObject["Chunks"sv].AsArrayView();
+
+ ZenCacheValue Record;
+ if (!m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, Record))
+ {
+ ZEN_WARN("GETRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, record expected to exist",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ ToString(ZenContentType::kCbPackage));
+ }
+ else
+ {
+ Record = {};
+
+ Stopwatch Timer;
+
+ bool SkipData = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::SkipData);
+ uint64_t TransferredSize = 0;
+ std::vector<std::pair<IoHash, CompressedBuffer>> ChunkResults;
+ size_t FoundCount = 0;
+ for (CbFieldView ChunkHashView : ChunkHashes)
+ {
+ IoHash AttachmentHash = ChunkHashView.AsHash();
+ if (SkipData)
+ {
+ if (m_CidStore.ContainsChunk(AttachmentHash))
+ {
+ ChunkResults.push_back(std::make_pair(AttachmentHash, CompressedBuffer{}));
+ }
+ else
+ {
+ ChunkResults.push_back(std::make_pair(IoHash::Zero, CompressedBuffer{}));
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash); Chunk)
+ {
+ TransferredSize += Chunk.GetSize();
+ ChunkResults.push_back(
+ std::make_pair(AttachmentHash, CompressedBuffer::FromCompressedNoValidate(std::move(Chunk))));
+ FoundCount++;
+ }
+ else
+ {
+ ChunkResults.push_back(std::make_pair(AttachmentHash, CompressedBuffer{}));
+ }
+ }
+ }
+
+ // TODO: true/false based on Valid vs Total and policy for partials?
+ Results.push_back(ChunkResults);
+
+ ZEN_DEBUG("GETRECORDATTACHMENTS - '{}' {}, attachments '{}/{}/{}' (total/found/missing) in {}",
+ *Namespace,
+ NiceBytes(TransferredSize),
+ ChunkHashes.Num(),
+ FoundCount,
+ ChunkHashes.Num() - FoundCount,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+ }
+
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (const std::vector<std::pair<IoHash, CompressedBuffer>>& Result : Results)
+ {
+ ResponseObject.BeginObject();
+ ResponseObject.BeginArray("Chunks"sv);
+ for (const auto& Chunk : Result)
+ {
+ ResponseObject.AddHash(Chunk.first);
+ if (Chunk.first != IoHash::Zero)
+ {
+ RpcResponse.AddAttachment(CbAttachment(Chunk.second, Chunk.first));
+ }
+ }
+ ResponseObject.EndArray(); // Chunks
+ ResponseObject.EndObject();
+ }
+ ResponseObject.EndArray(); // Result
+
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcPutRecordAttachments(const CacheRequestContext& Context, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutRecordAttachments");
+
+ CbObjectView BatchObject = BatchRequest.GetObject();
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutRecordAttachments"sv);
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ // m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+
+ std::vector<std::vector<bool>> Results;
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ // m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+ CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ ZEN_UNUSED(Policy);
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+ CbArrayView ChunkHashes = RequestObject["Chunks"sv].AsArrayView();
+
+ ZenCacheValue Record;
+ if (!m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, Record))
+ {
+ ZEN_WARN("PUTRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, record expected to exist, {} attachments dropped",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ChunkHashes.Num());
+ Results.push_back({});
+ }
+ else
+ {
+ Record = {};
+
+ Stopwatch Timer;
+
+ AttachmentCount Count;
+ Count.Total = gsl::narrow<uint32_t>(ChunkHashes.Num());
+
+ uint64_t TransferredSize = 0;
+
+ std::vector<bool> ChunkResults;
+ for (CbFieldView ChunkHashView : ChunkHashes)
+ {
+ IoHash AttachmentHash = ChunkHashView.AsHash();
+
+ const CbAttachment* Attachment = BatchRequest.FindAttachment(AttachmentHash);
+ if (Attachment && Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentHash);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+ TransferredSize += Chunk.GetCompressedSize();
+ ChunkResults.push_back(true);
+ }
+ else
+ {
+ ZEN_WARN("PUTRECORDATTACHMENTS - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ AttachmentHash);
+ Count.Invalid++;
+ ChunkResults.push_back(false);
+ }
+ }
+
+ // TODO: true/false based on Valid vs Total and policy for partials?
+ Results.push_back(std::move(ChunkResults));
+
+ ZEN_DEBUG("PUTRECORDATTACHMENTS - '{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
+ *Namespace,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+ }
+
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (const auto& Result : Results)
+ {
+ ResponseObject.BeginObject();
+ {
+ ResponseObject.BeginArray("Chunks");
+ for (bool Success : Result)
+ {
+ ResponseObject.AddBool(Success);
+ }
+ ResponseObject.EndArray();
+ }
+ ResponseObject.EndObject();
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+}
+
+CbPackage
CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
@@ -260,7 +526,8 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::vector<bool> Results;
+ std::vector<bool> Results;
+ std::unordered_set<IoHash> Needs;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
for (CbFieldView RequestField : RequestsArray)
@@ -269,6 +536,12 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+ if (!KeyView)
+ {
+ // The old standard way is to store the record key in the object, but for oplog store we don't do that
+ // So in that case we set the ky in the request object instead
+ KeyView = RequestObject["Key"sv].AsObjectView();
+ }
CacheKey Key;
if (!GetRpcRequestCacheKey(KeyView, Key))
@@ -282,13 +555,14 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
-
+ std::vector<IoHash> MissingAttachments;
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest, MissingAttachments);
if (Result == PutResult::Invalid)
{
return CbPackage{};
}
Results.push_back(Result == PutResult::Success);
+ Needs.insert(MissingAttachments.begin(), MissingAttachments.end());
}
if (Results.empty())
{
@@ -297,19 +571,31 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
CbObjectWriter ResponseObject;
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ for (bool Success : Results)
{
- ResponseObject.AddBool(Value);
+ ResponseObject.AddBool(Success);
}
ResponseObject.EndArray();
+ if (!Needs.empty())
+ {
+ ResponseObject.BeginArray("needs"sv);
+ {
+ for (const IoHash& RawHash : Needs)
+ {
+ ResponseObject.AddHash(RawHash);
+ }
+ }
+ ResponseObject.EndArray(); // needs
+ }
+
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
return RpcResponse;
}
PutResult
-CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package, std::vector<IoHash>& OutMissingAttachments)
{
CbObjectView Record = Request.RecordObject;
uint64_t RecordObjectSize = Record.GetSize();
@@ -327,49 +613,83 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Stopwatch Timer;
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
- CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(ZenContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
+ ZenCacheValue FetchedCacheValue;
+ if (!Record)
+ {
+ if (!m_CacheStore.Get(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, FetchedCacheValue))
+ {
+ // TODO: Log error
+ m_CacheStats.MissCount++;
+ return PutResult::Fail;
+ }
+ m_CacheStats.HitCount++;
+ if (FetchedCacheValue.Value.GetContentType() != ZenContentType::kCbObject)
+ {
+ // TODO: Log error
+ return PutResult::Invalid;
+ }
+ else
+ {
+ Record = CbObjectView(FetchedCacheValue.Value.GetData());
+ }
+ }
+
+ Record.IterateAttachments([this,
+ &Request,
+ Package,
+ &AttachmentsToStoreLocally,
+ &ValidAttachments,
+ &ReferencedAttachments,
+ &OutMissingAttachments,
+ &Count,
+ &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
{
+ AttachmentsToStoreLocally.emplace_back(Attachment);
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
}
- Count.Total++;
- });
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ else
+ {
+ OutMissingAttachments.emplace_back(ValueHash);
+ }
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
return PutResult::Invalid;
}
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
- m_CacheStats.WriteCount++;
+ if (!FetchedCacheValue.Value)
+ {
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+ }
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
@@ -382,7 +702,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
TransferredSize += Chunk.GetCompressedSize();
}
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}/{}' (new/valid/total/missing) in {}",
Request.Namespace,
Request.Key.Bucket,
Request.Key.Hash,
@@ -390,6 +710,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Count.New,
Count.Valid,
Count.Total,
+ OutMissingAttachments.size(),
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
const bool IsPartialRecord = Count.Valid != Count.Total;
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index a98010a7c..aeed8dce4 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -106,7 +106,10 @@ private:
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ CbPackage HandleRpcGetRecordAttachments(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcPutRecordAttachments(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package, std::vector<IoHash>& OutMissingAttachments);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(std::string& Namespace,