diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-04 19:30:34 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-04 19:30:34 +0200 |
| commit | bbe46452530a98a0bd36c0024d4f3f914ae23604 (patch) | |
| tree | e9c901a6ec68d087bc7e746b38d1573b2999a0ef /src | |
| parent | Use a smaller thread pool for network operations when doing oplog import to r... (diff) | |
| download | zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip | |
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open
* add ability to control size limit for small chunk callback when iterating block
* Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 54 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 360 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 166 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 145 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 4 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 32 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 39 |
7 files changed, 679 insertions, 121 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index aef7b348b..6e289409c 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -64,7 +64,8 @@ BlockStoreFile::Open() return true; }); void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true); + m_CachedFileSize = m_File.FileSize(); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_CachedFileSize, /*IsWholeFile*/ true); } void @@ -100,7 +101,7 @@ BlockStoreFile::Create(uint64_t InitialSize) uint64_t BlockStoreFile::FileSize() { - return m_File.FileSize(); + return m_CachedFileSize == 0 ? m_File.FileSize() : m_CachedFileSize; } void @@ -156,7 +157,8 @@ BlockStoreFile::IsOpen() const return !!m_IoBuffer; } -constexpr uint64_t IterateSmallChunkWindowSize = 2 * 1024 * 1024; +constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024; +constexpr uint64_t IterateSmallChunkMaxGapSize = 4 * 1024; BlockStore::BlockStore() { @@ -1041,20 +1043,33 @@ bool BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, std::span<const size_t> InChunkIndexes, const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback) + const IterateChunksLargeSizeCallback& LargeSizeCallback, + uint64_t LargeSizeLimit) { if (InChunkIndexes.empty()) { return true; } + uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit); + if (LargeSizeLimit == 0u) + { + LargeSizeLimit = IterateSmallChunkWindowSize; + } + else + { + IterateSmallChunkWindowSize = + Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize); + } + uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex; std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end()); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset; }); - auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t { + auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, &ChunkLocations](std::span<const size_t> ChunkIndexes, + size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; @@ -1065,7 +1080,11 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount]; const BlockStoreLocation& Location = ChunkLocations[NextIndex]; ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex); - if (Location.Offset >= (LastEnd + (4u * 1024u))) + if (Location.Size > LargeSizeLimit) + { + break; + } + if (Location.Offset >= (LastEnd + IterateSmallChunkMaxGapSize)) { break; } @@ -1097,8 +1116,9 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, } else { - const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + Ref<BlockStoreFile> BlockFile = FindBlockIt->second; ZEN_ASSERT(BlockFile); + InsertLock.ReleaseNow(); IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; void* BufferBase = ReadBuffer.MutableData(); @@ -1129,11 +1149,17 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, BlockIndex, BlockSize); - SmallSizeCallback(NextChunkIndex, nullptr, 0); + if (!SmallSizeCallback(NextChunkIndex, nullptr, 0)) + { + return false; + } continue; } void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + if (!SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size)) + { + return false; + } } LocationIndexOffset += RangeCount; continue; @@ -1167,7 +1193,7 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati Stopwatch Timer; auto _ = MakeGuard([&]() { - ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_DEBUG("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); @@ -1838,7 +1864,7 @@ TEST_CASE("blockstore.iterate.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", IterateSmallChunkWindowSize * 2, 1024); + Store.Initialize(RootDirectory / "store", DefaultIterateSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -1849,13 +1875,13 @@ TEST_CASE("blockstore.iterate.chunks") BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ false); - std::string VeryLargeChunk(IterateSmallChunkWindowSize * 2, 'L'); + std::string VeryLargeChunk(DefaultIterateSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, - .Offset = IterateSmallChunkWindowSize, - .Size = IterateSmallChunkWindowSize * 2}; + .Offset = DefaultIterateSmallChunkWindowSize, + .Size = DefaultIterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; WorkerThreadPool WorkerPool(4); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d67e8d6c8..9dd2e4a67 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1170,9 +1170,9 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return {}; } -struct ZenCacheDiskLayer::CacheBucket::BatchHandle +struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; @@ -1184,14 +1184,14 @@ struct ZenCacheDiskLayer::CacheBucket::BatchHandle std::vector<bool>& OutResults; }; -ZenCacheDiskLayer::CacheBucket::BatchHandle* +ZenCacheDiskLayer::CacheBucket::PutBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) { - return new BatchHandle(OutResults); + return new PutBatchHandle(OutResults); } void -ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept +ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { try { @@ -1272,6 +1272,232 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept } } +struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle +{ + GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) + { + Keys.reserve(OutResults.capacity()); + ResultIndexes.reserve(OutResults.capacity()); + } + + std::vector<IoHash> Keys; + std::vector<size_t> ResultIndexes; + + std::vector<ZenCacheValue>& OutResults; +}; + +ZenCacheDiskLayer::CacheBucket::GetBatchHandle* +ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + return new GetBatchHandle(OutResult); +} + +void +ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + + ZEN_ASSERT(Batch); + ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); + + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + + if (!Batch->ResultIndexes.empty()) + { + std::vector<DiskLocation> StandaloneDiskLocations; + std::vector<size_t> StandaloneKeyIndexes; + std::vector<DiskLocation> InlineDiskLocations; + std::vector<BlockStoreLocation> InlineBlockLocations; + std::vector<size_t> InlineKeyIndexes; + std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false); + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) + { + const IoHash& HashKey = Batch->Keys[KeyIndex]; + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + + const PayloadIndex PayloadIdx = It.value(); + m_AccessTimes[PayloadIdx] = GcClock::TickCount(); + const BucketPayload& Payload = m_Payloads[PayloadIdx]; + const DiskLocation& Location = Payload.Location; + + FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + if (Payload.MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; + OutValue.RawHash = MetaData.RawHash; + OutValue.RawSize = MetaData.RawSize; + FillRawHashAndRawSize[KeyIndex] = false; + } + + if (Payload.MemCached) + { + ZEN_ASSERT(!FillRawHashAndRawSize[KeyIndex]); + OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; + m_MemoryHitCount++; + } + else + { + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneDiskLocations.push_back(Location); + StandaloneKeyIndexes.push_back(KeyIndex); + } + else + { + InlineDiskLocations.push_back(Location); + InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); + InlineKeyIndexes.push_back(KeyIndex); + } + } + } + } + } + + auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) { + if (!Value) + { + return; + } + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + OutValue.Value = std::move(Value); + OutValue.Value.SetContentType(Location.GetContentType()); + + bool AddToMemCache = false; + bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + { + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + AddToMemCache = true; + } + } + + if (SetMetaInfo) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + { + OutValue = ZenCacheValue{}; + } + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + } + + if (SetMetaInfo || AddToMemCache) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); + const IoHash& Key = Batch->Keys[KeyIndex]; + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + { + if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) + { + BucketPayload& Payload = m_Payloads[UpdateIt->second]; + + // Only update if it has not already been updated by other thread + if (!Payload.MetaData && SetMetaInfo) + { + SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); + } + if (!Payload.MemCached && AddToMemCache) + { + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); + } + } + } + } + }; + + // We don't want to read into memory if they are to big since we might only want to touch the compressed + // header before sending it along + if (!InlineDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); + m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u); + m_BlockStore.IterateBlock( + InlineBlockLocations, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeCloneFromMemory(Data, Size)); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + return true; + }, + LargeChunkSizeLimit); + return true; + }); + } + + if (!StandaloneDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); + for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) + { + size_t KeyIndex = StandaloneKeyIndexes[Index]; + const DiskLocation& Location = StandaloneDiskLocations[Index]; + FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex])); + } + } + + for (size_t ResultIndex : Batch->ResultIndexes) + { + bool Hit = !!Batch->OutResults[ResultIndex].Value; + if (Hit) + { + m_DiskHitCount++; + StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); + } + else + { + m_DiskMissCount++; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + } + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::GetBatched"); + BatchHandle.Keys.push_back(HashKey); + BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size()); + BatchHandle.OutResults.push_back({}); +} + bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -1379,6 +1605,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } } + if (OutValue.Value) { m_DiskHitCount++; @@ -1396,7 +1623,7 @@ void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); @@ -2717,7 +2944,7 @@ void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); if (OptionalBatchHandle != nullptr) @@ -3540,19 +3767,19 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) return Result; } -struct ZenCacheDiskLayer::BatchHandle +struct ZenCacheDiskLayer::PutBatchHandle { - BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} struct BucketHandle { - CacheBucket* Bucket; - CacheBucket::BatchHandle* Handle; + CacheBucket* Bucket; + CacheBucket::PutBatchHandle* Handle; }; - void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept { RwLock::SharedLockScope _(Lock); - for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles) + for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); @@ -3560,7 +3787,7 @@ struct ZenCacheDiskLayer::BatchHandle } } - CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket) + CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); @@ -3572,7 +3799,7 @@ struct ZenCacheDiskLayer::BatchHandle } } - CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); + CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; @@ -3583,14 +3810,14 @@ struct ZenCacheDiskLayer::BatchHandle std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { - CacheBucket::BatchHandle* Result = It->Handle; + CacheBucket::PutBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndPutBatch(NewBucketHandle); return Result; } - BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } @@ -3599,17 +3826,93 @@ struct ZenCacheDiskLayer::BatchHandle std::vector<bool>& OutResults; }; -ZenCacheDiskLayer::BatchHandle* +ZenCacheDiskLayer::PutBatchHandle* ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) { - return new BatchHandle(OutResults); + return new PutBatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept +{ + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + delete Batch; +} + +struct ZenCacheDiskLayer::GetBatchHandle +{ + GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {} + struct BucketHandle + { + CacheBucket* Bucket; + CacheBucket::GetBatchHandle* Handle; + }; + + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept + { + RwLock::SharedLockScope _(Lock); + for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles) + { + ZEN_ASSERT(BucketHandle.Bucket); + ZEN_ASSERT(BucketHandle.Handle); + CB(BucketHandle.Bucket, BucketHandle.Handle); + } + } + + CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket) + { + { + RwLock::SharedLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + return It->Handle; + } + } + + CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults); + if (NewBucketHandle == nullptr) + { + return nullptr; + } + + RwLock::ExclusiveLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + CacheBucket::GetBatchHandle* Result = It->Handle; + ZEN_ASSERT(Result != nullptr); + _.ReleaseNow(); + Bucket->EndGetBatch(NewBucketHandle); + return Result; + } + + BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + + return NewBucketHandle; + } + + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheValue>& OutResults; +}; + +ZenCacheDiskLayer::GetBatchHandle* +ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults) +{ + return new GetBatchHandle(OutResults); } void -ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept +ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept { + ZEN_TRACE_CPU("Z$::GetBatched"); ZEN_ASSERT(Batch); - Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); }); + TryMemCacheTrim(); delete Batch; } @@ -3630,17 +3933,28 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle) +{ + ZEN_TRACE_CPU("Z$::GetBatched"); + + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket)); + } +} + +void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { - CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); + CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index d28eda8c4..f6e5d16b3 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -985,69 +985,93 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - for (CbFieldView RequestField : RequestsArray) + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + std::vector<ZenCacheValue> CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); + CacheValues.reserve(RequestCount); { - ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request"); - - m_CacheStats.RpcValueBatchRequests.fetch_add(1); + std::unique_ptr<ZenCacheStore::GetBatch> Batch; + if (RequestCount > 1) + { + Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, *Namespace, CacheValues); + } + for (CbFieldView RequestField : RequestsArray) + { + ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request"); - Stopwatch Timer; + m_CacheStats.RpcValueBatchRequests.fetch_add(1); - RequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + RequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) - { - return CbPackage{}; - } + if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) + { + return CbPackage{}; + } - PolicyText = RequestObject["Policy"sv].AsString(); - Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + PolicyText = RequestObject["Policy"sv].AsString(); + Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; - if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + ZenCacheValue CacheValue; + if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + if (Batch) + { + m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, *Batch.get()); + } + else + { + CacheValues.push_back({}); + m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValues.back()); + } + } + else + { + CacheValues.push_back({}); + } + } + Batch.reset(); + ZEN_ASSERT(CacheValues.size() == RequestsArray.Num()); + } + for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++) + { + RequestData& Request = Requests[RequestIndex]; + ZenCacheValue& Value = CacheValues[RequestIndex]; + if (Value.Value) { - if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) && - IsCompressedBinary(CacheValue.Value.GetContentType())) + if (IsCompressedBinary(Value.Value.GetContentType())) { - Request.RawHash = CacheValue.RawHash; - Request.RawSize = CacheValue.RawSize; - Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); + Request.RawHash = Value.RawHash; + Request.RawSize = Value.RawSize; + Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(Value.Value)); } } if (Request.Result) { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({})", *Namespace, - Key.Bucket, - Key.Hash, + Request.Key.Bucket, + Request.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + "LOCAL"sv); m_CacheStats.HitCount++; } - else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + else if (HasUpstream && EnumHasAllFlags(Request.Policy, CachePolicy::QueryRemote)) { - RemoteRequestIndexes.push_back(Requests.size() - 1); + RemoteRequestIndexes.push_back(RequestIndex); } - else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) + else if (!EnumHasAnyFlags(Request.Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Request.Key.Bucket, Request.Key.Hash); } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({})", *Namespace, Request.Key.Bucket, Request.Key.Hash, "LOCAL"sv); m_CacheStats.MissCount++; } } @@ -1497,29 +1521,61 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - for (ChunkRequest* Request : ValueRequests) + std::vector<ZenCacheValue> Chunks; + Chunks.reserve(ValueRequests.size()); { - ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value"); - - Stopwatch Timer; - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) + std::unique_ptr<ZenCacheStore::GetBatch> Batch; + if (ValueRequests.size() > 1) { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, Namespace, Chunks); + } + for (ChunkRequest* Request : ValueRequests) + { + Stopwatch Timer; + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (IsCompressedBinary(CacheValue.Value.GetContentType())) + if (Batch) { - Request->Key->ChunkId = CacheValue.RawHash; - Request->Exists = true; - Request->RawSize = CacheValue.RawSize; - Request->RawSizeKnown = true; - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } + m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, *Batch.get()); + } + else + { + Chunks.push_back({}); + m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, Chunks.back()); + } + } + else + { + Chunks.push_back({}); + } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + } + for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++) + { + Stopwatch Timer; + ChunkRequest* Request = ValueRequests[RequestIndex]; + if (Chunks[RequestIndex].Value) + { + if (IsCompressedBinary(Chunks[RequestIndex].Value.GetContentType())) + { + Request->Key->ChunkId = Chunks[RequestIndex].RawHash; + Request->Exists = true; + Request->RawSize = Chunks[RequestIndex].RawSize; + Request->RawSizeKnown = true; + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) + { + Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Chunks[RequestIndex].Value)); } } } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + + for (ChunkRequest* Request : ValueRequests) + { + ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value"); + Stopwatch Timer; if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 9eded2a50..d9c2d3e59 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -137,21 +137,21 @@ ZenCacheNamespace::~ZenCacheNamespace() m_Gc.RemoveGcContributor(this); } -struct ZenCacheNamespace::BatchHandle +struct ZenCacheNamespace::PutBatchHandle { - ZenCacheDiskLayer::BatchHandle* DiskLayerHandle = nullptr; + ZenCacheDiskLayer::PutBatchHandle* DiskLayerHandle = nullptr; }; -ZenCacheNamespace::BatchHandle* +ZenCacheNamespace::PutBatchHandle* ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult) { - ZenCacheNamespace::BatchHandle* Handle = new ZenCacheNamespace::BatchHandle; - Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); + ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; + Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); return Handle; } void -ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept +ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept { try { @@ -165,6 +165,50 @@ ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept } } +struct ZenCacheNamespace::GetBatchHandle +{ + GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {} + std::vector<ZenCacheValue>& Results; + ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr; +}; + +ZenCacheNamespace::GetBatchHandle* +ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +{ + ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult); + Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult); + return Handle; +} + +void +ZenCacheNamespace::EndGetBatch(GetBatchHandle* Batch) noexcept +{ + try + { + ZEN_ASSERT(Batch); + m_DiskLayer.EndGetBatch(Batch->DiskLayerHandle); + + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + for (const ZenCacheValue& Result : Batch->Results) + { + if (Result.Value) + { + m_HitCount++; + StatsScope.SetBytes(Result.Value.Size()); + } + else + { + m_MissCount++; + } + } + delete Batch; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache namespace layer when ending batch put operation: '{}'", Ex.what()); + } +} + bool ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -173,7 +217,6 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach metrics::RequestStats::Scope StatsScope(m_GetOps, 0); bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); - if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); @@ -188,11 +231,22 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void +ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle) +{ + ZEN_TRACE_CPU("Z$::Namespace::GetBatched"); + + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + + m_DiskLayer.Get(InBucket, HashKey, *BatchHandle.DiskLayerHandle); + return; +} + +void ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle) + PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Namespace::Put"); @@ -202,7 +256,8 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); - m_DiskLayer.Put(InBucket, HashKey, Value, References, OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr); + ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; + m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle); m_WriteCount++; } @@ -500,6 +555,43 @@ ZenCacheStore::PutBatch::~PutBatch() } } +ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult) +: m_CacheStore(CacheStore) +, Results(OutResult) +{ + if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) + { + m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult); + } +} + +ZenCacheStore::GetBatch::~GetBatch() +{ + try + { + if (m_Store) + { + ZEN_ASSERT(m_NamespaceBatchHandle); + m_Store->EndGetBatch(m_NamespaceBatchHandle); + + metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0); + for (const ZenCacheValue& Result : Results) + { + if (Result.Value) + { + m_CacheStore.m_HitCount++; + OpScope.SetBytes(Result.Value.GetSize()); + } + m_CacheStore.m_MissCount++; + } + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache store when ending batch get operation: '{}'", Ex.what()); + } +} + bool ZenCacheStore::Get(const CacheRequestContext& Context, std::string_view Namespace, @@ -562,6 +654,39 @@ ZenCacheStore::Get(const CacheRequestContext& Context, } void +ZenCacheStore::Get(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + GetBatch& BatchHandle) +{ + // Ad hoc rejection of known bad usage patterns for DDC bucket names + + if (IsKnownBadBucketName(Bucket)) + { + m_RejectedReadCount++; + return; + } + ZEN_TRACE_CPU("Z$::Get"); + + metrics::RequestStats::Scope OpScope(m_GetOps, 0); + + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + Store->Get(Bucket, HashKey, *BatchHandle.m_NamespaceBatchHandle); + return; + } + + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'", + Context, + Namespace, + Bucket, + HashKey.ToHexString()); + + m_MissCount++; +} + +void ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, @@ -603,7 +728,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { - ZenCacheNamespace::BatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; + ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; Store->Put(Bucket, HashKey, Value, References, BatchHandle); m_WriteCount++; return; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index a1e497533..ba8259c82 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -107,6 +107,7 @@ private: const std::filesystem::path m_Path; IoBuffer m_IoBuffer; BasicFile m_File; + uint64_t m_CachedFileSize = 0; }; class BlockStoreCompactState; @@ -183,7 +184,8 @@ public: bool IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, std::span<const size_t> ChunkIndexes, const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback); + const IterateChunksLargeSizeCallback& LargeSizeCallback, + uint64_t LargeSizeLimit = 0); void CompactBlocks( const BlockStoreCompactState& CompactState, diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 427c338d6..9dee4d3f7 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -164,16 +164,21 @@ public: explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); - struct BatchHandle; - BatchHandle* BeginPutBatch(std::vector<bool>& OutResult); - void EndPutBatch(BatchHandle* Batch) noexcept; + struct GetBatchHandle; + GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult); + void EndGetBatch(GetBatchHandle* Batch) noexcept; + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); + + struct PutBatchHandle; + PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle); + PutBatchHandle* OptionalBatchHandle); bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); @@ -205,12 +210,17 @@ public: ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - struct BatchHandle; - BatchHandle* BeginPutBatch(std::vector<bool>& OutResult); - void EndPutBatch(BatchHandle* Batch) noexcept; - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, BatchHandle* OptionalBatchHandle); + struct GetBatchHandle; + GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult); + void EndGetBatch(GetBatchHandle* Batch) noexcept; + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); + + struct PutBatchHandle; + PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle); uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); bool Drop(); void Flush(); @@ -342,7 +352,7 @@ public: void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle = nullptr); + PutBatchHandle* OptionalBatchHandle = nullptr); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 02bbeed77..7460d01ce 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -80,16 +80,21 @@ public: ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); - struct BatchHandle; - BatchHandle* BeginPutBatch(std::vector<bool>& OutResults); - void EndPutBatch(BatchHandle* Batch) noexcept; + struct PutBatchHandle; + PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + + struct GetBatchHandle; + GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults); + void EndGetBatch(GetBatchHandle* Batch) noexcept; bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, - BatchHandle* OptionalBatchHandle = nullptr); + PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -199,10 +204,24 @@ public: ~PutBatch(); private: - ZenCacheStore& m_CacheStore; - ZenCacheNamespace* m_Store = nullptr; - ZenCacheNamespace::BatchHandle* m_NamespaceBatchHandle = nullptr; + ZenCacheStore& m_CacheStore; + ZenCacheNamespace* m_Store = nullptr; + ZenCacheNamespace::PutBatchHandle* m_NamespaceBatchHandle = nullptr; + + friend class ZenCacheStore; + }; + + class GetBatch + { + public: + GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<ZenCacheValue>& OutResult); + ~GetBatch(); + private: + ZenCacheStore& m_CacheStore; + ZenCacheNamespace* m_Store = nullptr; + ZenCacheNamespace::GetBatchHandle* m_NamespaceBatchHandle = nullptr; + std::vector<ZenCacheValue>& Results; friend class ZenCacheStore; }; @@ -212,6 +231,12 @@ public: const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + GetBatch& BatchHandle); + void Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, |