diff options
| author | Dan Engelbrecht <[email protected]> | 2024-12-17 14:20:48 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-12-17 14:20:48 +0100 |
| commit | 48657d2fb6c9c709ec29264a609350c7b4a541f9 (patch) | |
| tree | a2c1997a99e9659ee038663e1a8da4b56ec01ad5 /src | |
| parent | remove all referenced attachments in op from pending chunk references (#267) (diff) | |
| download | zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.tar.xz zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.zip | |
batch fetch record cache values (#266)
- Improvement: Batch fetch record attachments when appropriate
- Improvement: Reduce memory buffer allocation in BlockStore::IterateBlock
- Improvement: Tweaked BlockStore::IterateBlock logic when to use threaded work (at least 4 chunks requested)
- Bugfix: CasContainerStrategy::IterateChunks could give wrong payload/index when requesting 1 or 2 chunks
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 9 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 67 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 35 |
3 files changed, 90 insertions, 21 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 9ad672060..3974fb989 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -809,8 +809,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, ZEN_ASSERT(BlockFile); InsertLock.ReleaseNow(); - IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + IoBuffer ReadBuffer; + void* BufferBase = nullptr; size_t LocationIndexOffset = 0; while (LocationIndexOffset < ChunkIndexes.size()) @@ -825,6 +825,11 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + if (ReadBuffer.GetSize() < Size) + { + ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize)); + BufferBase = ReadBuffer.MutableData(); + } BlockFile->Read(BufferBase, Size, FirstLocation.Offset); for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6b6be525..0a1ed0e09 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -16,6 +16,7 @@ #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include <zencore/memory/llm.h> @@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 50af7246e..30cf998f8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -345,12 +345,13 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } } } - if (FoundChunkLocations.size() < 3) + if (FoundChunkLocations.size() < 4) { - for (size_t ChunkIndex : FoundChunkIndexes) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); - if (!AsyncCallback(ChunkIndex, Chunk)) + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[Index]); + size_t OuterIndex = FoundChunkIndexes[Index]; + if (!AsyncCallback(OuterIndex, Chunk)) { return false; } @@ -359,6 +360,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); + if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) + { + return false; + } + } + return true; + } return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, @@ -378,19 +391,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, Latch WorkLatch(1); std::atomic_bool AsyncContinue = true; bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (ChunkIndexes.size() < 3) - { - for (size_t ChunkIndex : ChunkIndexes) - { - IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); - if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) - { - return false; - } - } - return true; - } - else if (OptionalWorkerPool) + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) { WorkLatch.AddCount(1); OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { |