diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-04 19:30:34 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-04 19:30:34 +0200 |
| commit | bbe46452530a98a0bd36c0024d4f3f914ae23604 (patch) | |
| tree | e9c901a6ec68d087bc7e746b38d1573b2999a0ef /src/zenstore/cache/cachedisklayer.cpp | |
| parent | Use a smaller thread pool for network operations when doing oplog import to r... (diff) | |
| download | zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip | |
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open
* add ability to control size limit for small chunk callback when iterating block
* Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 360 |
1 files changed, 337 insertions, 23 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d67e8d6c8..9dd2e4a67 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1170,9 +1170,9 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return {}; } -struct ZenCacheDiskLayer::CacheBucket::BatchHandle +struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; @@ -1184,14 +1184,14 @@ struct ZenCacheDiskLayer::CacheBucket::BatchHandle std::vector<bool>& OutResults; }; -ZenCacheDiskLayer::CacheBucket::BatchHandle* +ZenCacheDiskLayer::CacheBucket::PutBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) { - return new BatchHandle(OutResults); + return new PutBatchHandle(OutResults); } void -ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept +ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { try { @@ -1272,6 +1272,232 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept } } +struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle +{ + GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) + { + Keys.reserve(OutResults.capacity()); + ResultIndexes.reserve(OutResults.capacity()); + } + + std::vector<IoHash> Keys; + std::vector<size_t> ResultIndexes; + + std::vector<ZenCacheValue>& OutResults; +}; + +ZenCacheDiskLayer::CacheBucket::GetBatchHandle* +ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + return new GetBatchHandle(OutResult); +} + +void +ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + + ZEN_ASSERT(Batch); + ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); + + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + + if (!Batch->ResultIndexes.empty()) + { + std::vector<DiskLocation> StandaloneDiskLocations; + std::vector<size_t> StandaloneKeyIndexes; + std::vector<DiskLocation> InlineDiskLocations; + std::vector<BlockStoreLocation> InlineBlockLocations; + std::vector<size_t> InlineKeyIndexes; + std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false); + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) + { + const IoHash& HashKey = Batch->Keys[KeyIndex]; + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + + const PayloadIndex PayloadIdx = It.value(); + m_AccessTimes[PayloadIdx] = GcClock::TickCount(); + const BucketPayload& Payload = m_Payloads[PayloadIdx]; + const DiskLocation& Location = Payload.Location; + + FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + if (Payload.MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; + OutValue.RawHash = MetaData.RawHash; + OutValue.RawSize = MetaData.RawSize; + FillRawHashAndRawSize[KeyIndex] = false; + } + + if (Payload.MemCached) + { + ZEN_ASSERT(!FillRawHashAndRawSize[KeyIndex]); + OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; + m_MemoryHitCount++; + } + else + { + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneDiskLocations.push_back(Location); + StandaloneKeyIndexes.push_back(KeyIndex); + } + else + { + InlineDiskLocations.push_back(Location); + InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); + InlineKeyIndexes.push_back(KeyIndex); + } + } + } + } + } + + auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) { + if (!Value) + { + return; + } + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + OutValue.Value = std::move(Value); + OutValue.Value.SetContentType(Location.GetContentType()); + + bool AddToMemCache = false; + bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + { + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + AddToMemCache = true; + } + } + + if (SetMetaInfo) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + { + OutValue = ZenCacheValue{}; + } + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + } + + if (SetMetaInfo || AddToMemCache) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); + const IoHash& Key = Batch->Keys[KeyIndex]; + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + { + if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) + { + BucketPayload& Payload = m_Payloads[UpdateIt->second]; + + // Only update if it has not already been updated by other thread + if (!Payload.MetaData && SetMetaInfo) + { + SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); + } + if (!Payload.MemCached && AddToMemCache) + { + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); + } + } + } + } + }; + + // We don't want to read into memory if they are to big since we might only want to touch the compressed + // header before sending it along + if (!InlineDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); + m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u); + m_BlockStore.IterateBlock( + InlineBlockLocations, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeCloneFromMemory(Data, Size)); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + return true; + }, + LargeChunkSizeLimit); + return true; + }); + } + + if (!StandaloneDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); + for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) + { + size_t KeyIndex = StandaloneKeyIndexes[Index]; + const DiskLocation& Location = StandaloneDiskLocations[Index]; + FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex])); + } + } + + for (size_t ResultIndex : Batch->ResultIndexes) + { + bool Hit = !!Batch->OutResults[ResultIndex].Value; + if (Hit) + { + m_DiskHitCount++; + StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); + } + else + { + m_DiskMissCount++; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + } + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + BatchHandle.Keys.push_back(HashKey); + BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size()); + BatchHandle.OutResults.push_back({}); +} + bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -1379,6 +1605,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } } + if (OutValue.Value) { m_DiskHitCount++; @@ -1396,7 +1623,7 @@ void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); @@ -2717,7 +2944,7 @@ void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); if (OptionalBatchHandle != nullptr) @@ -3540,19 +3767,19 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) return Result; } -struct ZenCacheDiskLayer::BatchHandle +struct ZenCacheDiskLayer::PutBatchHandle { - BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} struct BucketHandle { - CacheBucket* Bucket; - CacheBucket::BatchHandle* Handle; + CacheBucket* Bucket; + CacheBucket::PutBatchHandle* Handle; }; - void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept { RwLock::SharedLockScope _(Lock); - for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles) + for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); @@ -3560,7 +3787,7 @@ struct ZenCacheDiskLayer::BatchHandle } } - CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket) + CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); @@ -3572,7 +3799,7 @@ struct ZenCacheDiskLayer::BatchHandle } } - CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); + CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; @@ -3583,14 +3810,14 @@ struct ZenCacheDiskLayer::BatchHandle std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { - CacheBucket::BatchHandle* Result = It->Handle; + CacheBucket::PutBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndPutBatch(NewBucketHandle); return Result; } - BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } @@ -3599,17 +3826,93 @@ struct ZenCacheDiskLayer::BatchHandle std::vector<bool>& OutResults; }; -ZenCacheDiskLayer::BatchHandle* +ZenCacheDiskLayer::PutBatchHandle* ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) { - return new BatchHandle(OutResults); + return new PutBatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept +{ + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + delete Batch; +} + +struct ZenCacheDiskLayer::GetBatchHandle +{ + GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {} + struct BucketHandle + { + CacheBucket* Bucket; + CacheBucket::GetBatchHandle* Handle; + }; + + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept + { + RwLock::SharedLockScope _(Lock); + for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles) + { + ZEN_ASSERT(BucketHandle.Bucket); + ZEN_ASSERT(BucketHandle.Handle); + CB(BucketHandle.Bucket, BucketHandle.Handle); + } + } + + CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket) + { + { + RwLock::SharedLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + return It->Handle; + } + } + + CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults); + if (NewBucketHandle == nullptr) + { + return nullptr; + } + + RwLock::ExclusiveLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + CacheBucket::GetBatchHandle* Result = It->Handle; + ZEN_ASSERT(Result != nullptr); + _.ReleaseNow(); + Bucket->EndGetBatch(NewBucketHandle); + return Result; + } + + BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + + return NewBucketHandle; + } + + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheValue>& OutResults; +}; + +ZenCacheDiskLayer::GetBatchHandle* +ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults) +{ + return new GetBatchHandle(OutResults); } void -ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept +ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept { + ZEN_TRACE_CPU("Z$::GetBatched"); ZEN_ASSERT(Batch); - Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); }); + TryMemCacheTrim(); delete Batch; } @@ -3630,17 +3933,28 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle) +{ + ZEN_TRACE_CPU("Z$::GetBatched"); + + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket)); + } +} + +void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { - CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); + CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } |