aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-02 10:53:15 +0200
committerGitHub Enterprise <[email protected]>2024-05-02 10:53:15 +0200
commit1bafcb32cb48b2256a9d72995388b7df72058039 (patch)
tree2385ec3de36c4f45018832eb36bcab6ac2d3670f /src
parentfix get project files loop (#68) (diff)
downloadzen-1bafcb32cb48b2256a9d72995388b7df72058039.tar.xz
zen-1bafcb32cb48b2256a9d72995388b7df72058039.zip
batch cache put (#67)
- Improvement: Batch scope for put of cache values
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp16
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp213
-rw-r--r--src/zenstore/cache/cacherpc.cpp246
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp81
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h23
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h29
6 files changed, 482 insertions, 126 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 8106e9db9..135eee57c 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -839,7 +839,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {});
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr);
m_CacheStats.WriteCount++;
}
}
@@ -925,8 +925,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore
- .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
+ m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ nullptr);
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -1067,7 +1072,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
Ref.BucketSegment,
Ref.HashKey,
{.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {});
+ {},
+ nullptr);
m_CacheStats.WriteCount++;
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
@@ -1116,7 +1122,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
TotalCount++;
});
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments);
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr);
m_CacheStats.WriteCount++;
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 51d547b3d..a497c8969 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1162,6 +1162,108 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return {};
}
+struct ZenCacheDiskLayer::CacheBucket::BatchHandle
+{
+ BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ struct Entry
+ {
+ std::vector<IoHash> HashKeyAndReferences;
+ };
+ std::vector<IoBuffer> Buffers;
+ std::vector<Entry> Entries;
+ std::vector<size_t> EntryResultIndexes;
+
+ std::vector<bool>& OutResults;
+};
+
+ZenCacheDiskLayer::CacheBucket::BatchHandle*
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+{
+ return new BatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ if (!Batch->Buffers.empty())
+ {
+ std::vector<uint8_t> EntryFlags;
+ for (const IoBuffer& Buffer : Batch->Buffers)
+ {
+ uint8_t Flags = 0;
+ if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
+ }
+
+ size_t IndexOffset = 0;
+ m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<DiskIndexEntry> DiskEntries;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ for (size_t Index = 0; Index < Locations.size(); Index++)
+ {
+ DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() > 1);
+ const IoHash HashKey = HashKeyAndReferences[0];
+ DiskEntries.push_back({.Key = HashKey, .Location = Location});
+ if (m_TrackedCacheKeys)
+ {
+ m_TrackedCacheKeys->insert(HashKey);
+ }
+ if (m_TrackedReferences && HashKeyAndReferences.size() > 1)
+ {
+ m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ }
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+
+ Payload = (BucketPayload{.Location = Location});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ }
+ else
+ {
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Location});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ }
+ }
+ m_SlogFile.Append(DiskEntries);
+ for (size_t Index = 0; Index < Locations.size(); Index++)
+ {
+ size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
+ ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
+ Batch->OutResults[ResultIndex] = true;
+ }
+ IndexOffset += Locations.size();
+ });
+ }
+ delete Batch;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache bucket when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -1283,7 +1385,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
@@ -1292,10 +1397,14 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
PutStandaloneCacheValue(HashKey, Value, References);
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(true);
+ }
}
else
{
- PutInlineCacheValue(HashKey, Value, References);
+ PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
}
m_DiskWriteCount++;
@@ -2593,10 +2702,24 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
-
+ if (OptionalBatchHandle != nullptr)
+ {
+ OptionalBatchHandle->Buffers.push_back(Value.Value);
+ OptionalBatchHandle->Entries.push_back({});
+ OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
+ OptionalBatchHandle->OutResults.push_back(false);
+ std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
+ HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size());
+ HashKeyAndReferences.push_back(HashKey);
+ HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end());
+ return;
+ }
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -3423,6 +3546,79 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
return Result;
}
+struct ZenCacheDiskLayer::BatchHandle
+{
+ BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ struct BucketHandle
+ {
+ CacheBucket* Bucket;
+ CacheBucket::BatchHandle* Handle;
+ };
+
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+
+ CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket)
+ {
+ {
+ RwLock::SharedLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ return It->Handle;
+ }
+ }
+
+ CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ if (NewBucketHandle == nullptr)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ CacheBucket::BatchHandle* Result = It->Handle;
+ ZEN_ASSERT(Result != nullptr);
+ _.ReleaseNow();
+ Bucket->EndPutBatch(NewBucketHandle);
+ return Result;
+ }
+
+ BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+
+ return NewBucketHandle;
+ }
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<bool>& OutResults;
+};
+
+ZenCacheDiskLayer::BatchHandle*
+ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+{
+ return new BatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept
+{
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+}
+
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -3440,13 +3636,18 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::Put(std::string_view InBucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
- Bucket->Put(HashKey, Value, References);
+ CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
+ Bucket->Put(HashKey, Value, References, BucketBatchHandle);
TryMemCacheTrim();
}
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 60b2fa1a1..6871dfd56 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -377,7 +377,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -652,8 +652,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ nullptr);
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -799,94 +804,139 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> Results;
- for (CbFieldView RequestField : RequestsArray)
+ std::vector<bool> BatchResults;
+ std::vector<size_t> BatchResultIndexes;
+ std::vector<bool> Results;
+ std::vector<CacheKey> UpstreamCacheKeys;
{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
+ Results.reserve(RequestsArray.Num());
+ ZenCacheStore::PutBatch Batch(m_CacheStore, *Namespace, BatchResults);
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
- Stopwatch Timer;
+ Stopwatch Timer;
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
+ PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
+ uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ bool Valid = false;
+ uint64_t TransferredSize = 0;
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
+ if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ if (Attachment->IsCompressedBinary())
{
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
+ // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
+ Policy |= CachePolicy::StoreLocal;
+ }
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- RawSize = Chunk.DecodeRawSize();
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value.SetContentType(ZenContentType::kCompressedBinary);
+ if (RawSize == 0)
+ {
+ RawSize = Chunk.DecodeRawSize();
+ }
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ &Batch);
+ m_CacheStats.WriteCount++;
+ TransferredSize = Chunk.GetCompressedSize();
+ BatchResultIndexes.push_back(Results.size());
+ Results.push_back(false);
}
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
- m_CacheStats.WriteCount++;
- TransferredSize = Chunk.GetCompressedSize();
+ else
+ {
+ Results.push_back(true);
+ }
+ Valid = true;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
+ return CbPackage{};
}
- Succeeded = true;
}
- else
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
+ ZenCacheValue ExistingValue;
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ {
+ Results.push_back(true);
+ Valid = true;
+ }
+ else
+ {
+ Results.push_back(false);
+ }
}
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
+ // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
+
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- Succeeded = true;
+ UpstreamCacheKeys.push_back(Key);
+ }
+ else
+ {
+ UpstreamCacheKeys.push_back(CacheKey::Empty);
}
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
- if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(TransferredSize),
+ Valid ? "Added"sv : "Invalid",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
}
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
}
if (Results.empty())
{
return CbPackage{};
}
+ ZEN_ASSERT(UpstreamCacheKeys.size() == Results.size());
+ ZEN_ASSERT(BatchResults.size() == BatchResultIndexes.size());
+ for (size_t Index = 0; Index < BatchResults.size(); Index++)
+ {
+ size_t BatchResultIndex = BatchResultIndexes[Index];
+ ZEN_ASSERT(BatchResultIndex < Results.size());
+ ZEN_ASSERT(Results[BatchResultIndex] == false);
+ Results[BatchResultIndex] = BatchResults[Index];
+ }
+
+ for (std::size_t Index = 0; Index < Results.size(); Index++)
+ {
+ if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ {
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ }
+ }
{
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject;
@@ -1046,7 +1096,8 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
Request.Key.Bucket,
Request.Key.Hash,
ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {});
+ {},
+ nullptr);
m_CacheStats.WriteCount++;
}
@@ -1314,36 +1365,36 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
+ CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1531,7 +1582,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
Key.Key.Bucket,
Key.Key.Hash,
{.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {});
+ {},
+ nullptr);
m_CacheStats.WriteCount++;
}
}
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index e7524271e..9eded2a50 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -137,6 +137,34 @@ ZenCacheNamespace::~ZenCacheNamespace()
m_Gc.RemoveGcContributor(this);
}
+struct ZenCacheNamespace::BatchHandle
+{
+ ZenCacheDiskLayer::BatchHandle* DiskLayerHandle = nullptr;
+};
+
+ZenCacheNamespace::BatchHandle*
+ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+{
+ ZenCacheNamespace::BatchHandle* Handle = new ZenCacheNamespace::BatchHandle;
+ Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
+ return Handle;
+}
+
+void
+ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.EndPutBatch(Batch->DiskLayerHandle);
+ delete Batch;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache namespace layer when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -160,7 +188,11 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
-ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheNamespace::Put(std::string_view InBucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Namespace::Put");
@@ -170,7 +202,7 @@ ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Value.Value.Size());
- m_DiskLayer.Put(InBucket, HashKey, Value, References);
+ m_DiskLayer.Put(InBucket, HashKey, Value, References, OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr);
m_WriteCount++;
}
@@ -443,6 +475,31 @@ ZenCacheStore::LogWorker()
}
}
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+: m_CacheStore(CacheStore)
+{
+ if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store)
+ {
+ m_NamespaceBatchHandle = m_Store->BeginPutBatch(OutResult);
+ }
+}
+
+ZenCacheStore::PutBatch::~PutBatch()
+{
+ try
+ {
+ if (m_Store)
+ {
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->EndPutBatch(m_NamespaceBatchHandle);
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache store when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheStore::Get(const CacheRequestContext& Context,
std::string_view Namespace,
@@ -477,8 +534,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
.Namespace = std::string(Namespace),
.Bucket = std::string(Bucket),
.HashKey = HashKey,
- .Value = OutValue /*,
- .Result = Result*/});
+ .Value = OutValue});
});
if (Signal)
{
@@ -511,7 +567,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
- std::span<IoHash> References)
+ std::span<IoHash> References,
+ PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -532,12 +589,11 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
m_LogQueueLock.WithExclusiveLock([&]() {
Signal = m_LogQueue.empty();
m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
- .Context = Context,
- .Namespace = std::string(Namespace),
- .Bucket = std::string(Bucket),
- .HashKey = HashKey,
- .Value = Value /*,
- .Result = true*/});
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = Value});
});
if (Signal)
{
@@ -547,7 +603,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
- Store->Put(Bucket, HashKey, Value, References);
+ ZenCacheNamespace::BatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
+ Store->Put(Bucket, HashKey, Value, References, BatchHandle);
m_WriteCount++;
return;
}
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 471cc5dcd..427c338d6 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -164,8 +164,16 @@ public:
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
+ struct BatchHandle;
+ BatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ void EndPutBatch(BatchHandle* Batch) noexcept;
+
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle);
bool Drop();
bool DropBucket(std::string_view Bucket);
void Flush();
@@ -196,9 +204,13 @@ public:
CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ struct BatchHandle;
+ BatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ void EndPutBatch(BatchHandle* Batch) noexcept;
+
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, BatchHandle* OptionalBatchHandle);
uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
bool Drop();
void Flush();
@@ -327,7 +339,10 @@ public:
void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle = nullptr);
IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 3bed93d70..02bbeed77 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -80,8 +80,16 @@ public:
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
+ struct BatchHandle;
+ BatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ void EndPutBatch(BatchHandle* Batch) noexcept;
+
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -184,17 +192,34 @@ public:
const DiskWriteBlocker* InDiskWriteBlocker);
~ZenCacheStore();
+ class PutBatch
+ {
+ public:
+ PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult);
+ ~PutBatch();
+
+ private:
+ ZenCacheStore& m_CacheStore;
+ ZenCacheNamespace* m_Store = nullptr;
+ ZenCacheNamespace::BatchHandle* m_NamespaceBatchHandle = nullptr;
+
+ friend class ZenCacheStore;
+ };
+
bool Get(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
ZenCacheValue& OutValue);
+
void Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
- std::span<IoHash> References);
+ std::span<IoHash> References,
+ PutBatch* OptionalBatchHandle = nullptr);
+
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);
void Flush();