aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp246
1 files changed, 149 insertions, 97 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 60b2fa1a1..6871dfd56 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -377,7 +377,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -652,8 +652,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ nullptr);
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -799,94 +804,139 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> Results;
- for (CbFieldView RequestField : RequestsArray)
+ std::vector<bool> BatchResults;
+ std::vector<size_t> BatchResultIndexes;
+ std::vector<bool> Results;
+ std::vector<CacheKey> UpstreamCacheKeys;
{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
+ Results.reserve(RequestsArray.Num());
+ ZenCacheStore::PutBatch Batch(m_CacheStore, *Namespace, BatchResults);
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
- Stopwatch Timer;
+ Stopwatch Timer;
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
+ PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
+ uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ bool Valid = false;
+ uint64_t TransferredSize = 0;
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
+ if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ if (Attachment->IsCompressedBinary())
{
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
+ // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
+ Policy |= CachePolicy::StoreLocal;
+ }
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- RawSize = Chunk.DecodeRawSize();
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value.SetContentType(ZenContentType::kCompressedBinary);
+ if (RawSize == 0)
+ {
+ RawSize = Chunk.DecodeRawSize();
+ }
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ &Batch);
+ m_CacheStats.WriteCount++;
+ TransferredSize = Chunk.GetCompressedSize();
+ BatchResultIndexes.push_back(Results.size());
+ Results.push_back(false);
}
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
- m_CacheStats.WriteCount++;
- TransferredSize = Chunk.GetCompressedSize();
+ else
+ {
+ Results.push_back(true);
+ }
+ Valid = true;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
+ return CbPackage{};
}
- Succeeded = true;
}
- else
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
+ ZenCacheValue ExistingValue;
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ {
+ Results.push_back(true);
+ Valid = true;
+ }
+ else
+ {
+ Results.push_back(false);
+ }
}
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
+ // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
+
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- Succeeded = true;
+ UpstreamCacheKeys.push_back(Key);
+ }
+ else
+ {
+ UpstreamCacheKeys.push_back(CacheKey::Empty);
}
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
- if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(TransferredSize),
+ Valid ? "Added"sv : "Invalid",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
}
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
}
if (Results.empty())
{
return CbPackage{};
}
+ ZEN_ASSERT(UpstreamCacheKeys.size() == Results.size());
+ ZEN_ASSERT(BatchResults.size() == BatchResultIndexes.size());
+ for (size_t Index = 0; Index < BatchResults.size(); Index++)
+ {
+ size_t BatchResultIndex = BatchResultIndexes[Index];
+ ZEN_ASSERT(BatchResultIndex < Results.size());
+ ZEN_ASSERT(Results[BatchResultIndex] == false);
+ Results[BatchResultIndex] = BatchResults[Index];
+ }
+
+ for (std::size_t Index = 0; Index < Results.size(); Index++)
+ {
+ if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ {
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ }
+ }
{
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject;
@@ -1046,7 +1096,8 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
Request.Key.Bucket,
Request.Key.Hash,
ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {});
+ {},
+ nullptr);
m_CacheStats.WriteCount++;
}
@@ -1314,36 +1365,36 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
+ CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1531,7 +1582,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
Key.Key.Bucket,
Key.Key.Hash,
{.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {});
+ {},
+ nullptr);
m_CacheStats.WriteCount++;
}
}