diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-02 10:53:15 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-02 10:53:15 +0200 |
| commit | 1bafcb32cb48b2256a9d72995388b7df72058039 (patch) | |
| tree | 2385ec3de36c4f45018832eb36bcab6ac2d3670f /src | |
| parent | fix get project files loop (#68) (diff) | |
| download | zen-1bafcb32cb48b2256a9d72995388b7df72058039.tar.xz zen-1bafcb32cb48b2256a9d72995388b7df72058039.zip | |
batch cache put (#67)
- Improvement: Batch scope for put of cache values
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 16 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 213 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 246 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 81 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 23 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 29 |
6 files changed, 482 insertions, 126 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8106e9db9..135eee57c 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -839,7 +839,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); m_CacheStats.WriteCount++; } } @@ -925,8 +925,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + nullptr); m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -1067,7 +1072,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1116,7 +1122,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 51d547b3d..a497c8969 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1162,6 +1162,108 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return {}; } +struct ZenCacheDiskLayer::CacheBucket::BatchHandle +{ + BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + struct Entry + { + std::vector<IoHash> HashKeyAndReferences; + }; + std::vector<IoBuffer> Buffers; + std::vector<Entry> Entries; + std::vector<size_t> EntryResultIndexes; + + std::vector<bool>& OutResults; +}; + +ZenCacheDiskLayer::CacheBucket::BatchHandle* +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +{ + return new BatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept +{ + try + { + ZEN_ASSERT(Batch); + if (!Batch->Buffers.empty()) + { + std::vector<uint8_t> EntryFlags; + for (const IoBuffer& Buffer : Batch->Buffers) + { + uint8_t Flags = 0; + if (Buffer.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); + } + + size_t IndexOffset = 0; + m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + std::vector<DiskIndexEntry> DiskEntries; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + for (size_t Index = 0; Index < Locations.size(); Index++) + { + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 1); + const IoHash HashKey = HashKeyAndReferences[0]; + DiskEntries.push_back({.Key = HashKey, .Location = Location}); + if (m_TrackedCacheKeys) + { + m_TrackedCacheKeys->insert(HashKey); + } + if (m_TrackedReferences && HashKeyAndReferences.size() > 1) + { + m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + } + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); + BucketPayload& Payload = m_Payloads[EntryIndex]; + + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + + Payload = (BucketPayload{.Location = Location}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + } + else + { + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Location}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + } + } + m_SlogFile.Append(DiskEntries); + for (size_t Index = 0; Index < Locations.size(); Index++) + { + size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; + ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); + Batch->OutResults[ResultIndex] = true; + } + IndexOffset += Locations.size(); + }); + } + delete Batch; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache bucket when ending batch put operation: '{}'", Ex.what()); + } +} + bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -1283,7 +1385,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); @@ -1292,10 +1397,14 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(true); + } } else { - PutInlineCacheValue(HashKey, Value, References); + PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); } m_DiskWriteCount++; @@ -2593,10 +2702,24 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); - + if (OptionalBatchHandle != nullptr) + { + OptionalBatchHandle->Buffers.push_back(Value.Value); + OptionalBatchHandle->Entries.push_back({}); + OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); + OptionalBatchHandle->OutResults.push_back(false); + std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; + HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.push_back(HashKey); + HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); + return; + } uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -3423,6 +3546,79 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) return Result; } +struct ZenCacheDiskLayer::BatchHandle +{ + BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + struct BucketHandle + { + CacheBucket* Bucket; + CacheBucket::BatchHandle* Handle; + }; + + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept + { + RwLock::SharedLockScope _(Lock); + for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles) + { + ZEN_ASSERT(BucketHandle.Bucket); + ZEN_ASSERT(BucketHandle.Handle); + CB(BucketHandle.Bucket, BucketHandle.Handle); + } + } + + CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket) + { + { + RwLock::SharedLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + return It->Handle; + } + } + + CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); + if (NewBucketHandle == nullptr) + { + return nullptr; + } + + RwLock::ExclusiveLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + CacheBucket::BatchHandle* Result = It->Handle; + ZEN_ASSERT(Result != nullptr); + _.ReleaseNow(); + Bucket->EndPutBatch(NewBucketHandle); + return Result; + } + + BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + + return NewBucketHandle; + } + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<bool>& OutResults; +}; + +ZenCacheDiskLayer::BatchHandle* +ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +{ + return new BatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept +{ + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + delete Batch; +} + bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -3440,13 +3636,18 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::Put(std::string_view InBucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { - Bucket->Put(HashKey, Value, References); + CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); + Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 60b2fa1a1..6871dfd56 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -377,7 +377,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); + m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -652,8 +652,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore - .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments); + m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + nullptr); m_CacheStats.WriteCount++; } ParseValues(Request); @@ -799,94 +804,139 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> Results; - for (CbFieldView RequestField : RequestsArray) + std::vector<bool> BatchResults; + std::vector<size_t> BatchResultIndexes; + std::vector<bool> Results; + std::vector<CacheKey> UpstreamCacheKeys; { - ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request"); + Results.reserve(RequestsArray.Num()); + ZenCacheStore::PutBatch Batch(m_CacheStore, *Namespace, BatchResults); + for (CbFieldView RequestField : RequestsArray) + { + ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request"); - m_CacheStats.RpcValueBatchRequests.fetch_add(1); + m_CacheStats.RpcValueBatchRequests.fetch_add(1); - Stopwatch Timer; + Stopwatch Timer; - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } - PolicyText = RequestObject["Policy"sv].AsString(); - CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); - uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - bool Succeeded = false; - uint64_t TransferredSize = 0; + PolicyText = RequestObject["Policy"sv].AsString(); + CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); + uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + bool Valid = false; + uint64_t TransferredSize = 0; - if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) - { - if (Attachment->IsCompressedBinary()) + if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + if (Attachment->IsCompressedBinary()) { - // TODO: Implement upstream puts of CacheValues with StoreLocal == false. - // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. - Policy |= CachePolicy::StoreLocal; - } + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + // TODO: Implement upstream puts of CacheValues with StoreLocal == false. + // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. + Policy |= CachePolicy::StoreLocal; + } - if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); - Value.SetContentType(ZenContentType::kCompressedBinary); - if (RawSize == 0) + if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - RawSize = Chunk.DecodeRawSize(); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Value.SetContentType(ZenContentType::kCompressedBinary); + if (RawSize == 0) + { + RawSize = Chunk.DecodeRawSize(); + } + m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + &Batch); + m_CacheStats.WriteCount++; + TransferredSize = Chunk.GetCompressedSize(); + BatchResultIndexes.push_back(Results.size()); + Results.push_back(false); } - m_CacheStore - .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {}); - m_CacheStats.WriteCount++; - TransferredSize = Chunk.GetCompressedSize(); + else + { + Results.push_back(true); + } + Valid = true; + } + else + { + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); + return CbPackage{}; } - Succeeded = true; } - else + else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); - return CbPackage{}; + ZenCacheValue ExistingValue; + if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && + IsCompressedBinary(ExistingValue.Value.GetContentType())) + { + Results.push_back(true); + Valid = true; + } + else + { + Results.push_back(false); + } } - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - ZenCacheValue ExistingValue; - if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && - IsCompressedBinary(ExistingValue.Value.GetContentType())) + // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. + // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. + + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - Succeeded = true; + UpstreamCacheKeys.push_back(Key); + } + else + { + UpstreamCacheKeys.push_back(CacheKey::Empty); } - } - // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. - // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. - if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(TransferredSize), + Valid ? "Added"sv : "Invalid", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } - Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(TransferredSize), - Succeeded ? "Added"sv : "Invalid", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } if (Results.empty()) { return CbPackage{}; } + ZEN_ASSERT(UpstreamCacheKeys.size() == Results.size()); + ZEN_ASSERT(BatchResults.size() == BatchResultIndexes.size()); + for (size_t Index = 0; Index < BatchResults.size(); Index++) + { + size_t BatchResultIndex = BatchResultIndexes[Index]; + ZEN_ASSERT(BatchResultIndex < Results.size()); + ZEN_ASSERT(Results[BatchResultIndex] == false); + Results[BatchResultIndex] = BatchResults[Index]; + } + + for (std::size_t Index = 0; Index < Results.size(); Index++) + { + if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + } + } { ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject; @@ -1046,7 +1096,8 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; } @@ -1314,36 +1365,36 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = - [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector<IoHash> ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( + CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + std::vector<IoHash> ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1531,7 +1582,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; } } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index e7524271e..9eded2a50 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -137,6 +137,34 @@ ZenCacheNamespace::~ZenCacheNamespace() m_Gc.RemoveGcContributor(this); } +struct ZenCacheNamespace::BatchHandle +{ + ZenCacheDiskLayer::BatchHandle* DiskLayerHandle = nullptr; +}; + +ZenCacheNamespace::BatchHandle* +ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult) +{ + ZenCacheNamespace::BatchHandle* Handle = new ZenCacheNamespace::BatchHandle; + Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); + return Handle; +} + +void +ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept +{ + try + { + ZEN_ASSERT(Batch); + m_DiskLayer.EndPutBatch(Batch->DiskLayerHandle); + delete Batch; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache namespace layer when ending batch put operation: '{}'", Ex.what()); + } +} + bool ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -160,7 +188,11 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void -ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheNamespace::Put(std::string_view InBucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Namespace::Put"); @@ -170,7 +202,7 @@ ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Value.Value.Size()); - m_DiskLayer.Put(InBucket, HashKey, Value, References); + m_DiskLayer.Put(InBucket, HashKey, Value, References, OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr); m_WriteCount++; } @@ -443,6 +475,31 @@ ZenCacheStore::LogWorker() } } +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult) +: m_CacheStore(CacheStore) +{ + if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) + { + m_NamespaceBatchHandle = m_Store->BeginPutBatch(OutResult); + } +} + +ZenCacheStore::PutBatch::~PutBatch() +{ + try + { + if (m_Store) + { + ZEN_ASSERT(m_NamespaceBatchHandle); + m_Store->EndPutBatch(m_NamespaceBatchHandle); + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache store when ending batch put operation: '{}'", Ex.what()); + } +} + bool ZenCacheStore::Get(const CacheRequestContext& Context, std::string_view Namespace, @@ -477,8 +534,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context, .Namespace = std::string(Namespace), .Bucket = std::string(Bucket), .HashKey = HashKey, - .Value = OutValue /*, - .Result = Result*/}); + .Value = OutValue}); }); if (Signal) { @@ -511,7 +567,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, - std::span<IoHash> References) + std::span<IoHash> References, + PutBatch* OptionalBatchHandle) { // Ad hoc rejection of known bad usage patterns for DDC bucket names @@ -532,12 +589,11 @@ ZenCacheStore::Put(const CacheRequestContext& Context, m_LogQueueLock.WithExclusiveLock([&]() { Signal = m_LogQueue.empty(); m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ", - .Context = Context, - .Namespace = std::string(Namespace), - .Bucket = std::string(Bucket), - .HashKey = HashKey, - .Value = Value /*, - .Result = true*/}); + .Context = Context, + .Namespace = std::string(Namespace), + .Bucket = std::string(Bucket), + .HashKey = HashKey, + .Value = Value}); }); if (Signal) { @@ -547,7 +603,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { - Store->Put(Bucket, HashKey, Value, References); + ZenCacheNamespace::BatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; + Store->Put(Bucket, HashKey, Value, References, BatchHandle); m_WriteCount++; return; } diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 471cc5dcd..427c338d6 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -164,8 +164,16 @@ public: explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); + struct BatchHandle; + BatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + void EndPutBatch(BatchHandle* Batch) noexcept; + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle); bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); @@ -196,9 +204,13 @@ public: CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); - bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + struct BatchHandle; + BatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + void EndPutBatch(BatchHandle* Batch) noexcept; + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, BatchHandle* OptionalBatchHandle); uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); bool Drop(); void Flush(); @@ -327,7 +339,10 @@ public: void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle = nullptr); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 3bed93d70..02bbeed77 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -80,8 +80,16 @@ public: ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); + struct BatchHandle; + BatchHandle* BeginPutBatch(std::vector<bool>& OutResults); + void EndPutBatch(BatchHandle* Batch) noexcept; + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -184,17 +192,34 @@ public: const DiskWriteBlocker* InDiskWriteBlocker); ~ZenCacheStore(); + class PutBatch + { + public: + PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult); + ~PutBatch(); + + private: + ZenCacheStore& m_CacheStore; + ZenCacheNamespace* m_Store = nullptr; + ZenCacheNamespace::BatchHandle* m_NamespaceBatchHandle = nullptr; + + friend class ZenCacheStore; + }; + bool Get(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, - std::span<IoHash> References); + std::span<IoHash> References, + PutBatch* OptionalBatchHandle = nullptr); + bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); |