aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-12-17 14:20:48 +0100
committerGitHub Enterprise <[email protected]>2024-12-17 14:20:48 +0100
commit48657d2fb6c9c709ec29264a609350c7b4a541f9 (patch)
treea2c1997a99e9659ee038663e1a8da4b56ec01ad5 /src
parentremove all referenced attachments in op from pending chunk references (#267) (diff)
downloadzen-48657d2fb6c9c709ec29264a609350c7b4a541f9.tar.xz
zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.zip
batch fetch record cache values (#266)
- Improvement: Batch fetch record attachments when appropriate - Improvement: Reduce memory buffer allocation in BlockStore::IterateBlock - Improvement: Tweaked BlockStore::IterateBlock logic when to use threaded work (at least 4 chunks requested) - Bugfix: CasContainerStrategy::IterateChunks could give wrong payload/index when requesting 1 or 2 chunks
Diffstat (limited to 'src')
-rw-r--r--src/zenstore/blockstore.cpp9
-rw-r--r--src/zenstore/cache/cacherpc.cpp67
-rw-r--r--src/zenstore/compactcas.cpp35
3 files changed, 90 insertions, 21 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 9ad672060..3974fb989 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -809,8 +809,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
ZEN_ASSERT(BlockFile);
InsertLock.ReleaseNow();
- IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ IoBuffer ReadBuffer;
+ void* BufferBase = nullptr;
size_t LocationIndexOffset = 0;
while (LocationIndexOffset < ChunkIndexes.size())
@@ -825,6 +825,11 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ if (ReadBuffer.GetSize() < Size)
+ {
+ ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize));
+ BufferBase = ReadBuffer.MutableData();
+ }
BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
{
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index e6b6be525..0a1ed0e09 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -16,6 +16,7 @@
#include <zenstore/cidstore.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include <zencore/memory/llm.h>
@@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
ParseValues(Request);
- Request.Complete = true;
+ Request.Complete = true;
+ size_t ValueCount = Request.Values.size();
+ std::vector<IoHash> CidHashes;
+ std::vector<size_t> RequestValueIndexes;
+ const bool DoBatch = ValueCount > 7;
+ if (DoBatch)
+ {
+ CidHashes.reserve(ValueCount);
+ RequestValueIndexes.reserve(ValueCount);
+ }
+ size_t ValueIndex = 0;
for (ValueRequestData& Value : Request.Values)
{
CachePolicy ValuePolicy = Value.DownstreamPolicy;
@@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.Complete = false;
}
}
+ else if (DoBatch)
+ {
+ CidHashes.push_back(Value.ContentId);
+ RequestValueIndexes.push_back(ValueIndex);
+ }
else
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
@@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
}
}
-
if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
NeedUpstreamAttachment = true;
@@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = false;
}
+ ValueIndex++;
+ }
+ if (!RequestValueIndexes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidHashes,
+ [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ try
+ {
+ const size_t ValueIndex = RequestValueIndexes[Index];
+ ValueRequestData& Value = Request.Values[ValueIndex];
+ if (Payload)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned();
+ if (Value.Payload)
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 64u * 1024u);
+
+ for (size_t Index : RequestValueIndexes)
+ {
+ ValueRequestData& Value = Request.Values[Index];
+ if (!Value.Exists)
+ {
+ const CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
}
}
}
@@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ // TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 50af7246e..30cf998f8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -345,12 +345,13 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
}
}
- if (FoundChunkLocations.size() < 3)
+ if (FoundChunkLocations.size() < 4)
{
- for (size_t ChunkIndex : FoundChunkIndexes)
+ for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
- if (!AsyncCallback(ChunkIndex, Chunk))
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[Index]);
+ size_t OuterIndex = FoundChunkIndexes[Index];
+ if (!AsyncCallback(OuterIndex, Chunk))
{
return false;
}
@@ -359,6 +360,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ if (ChunkIndexes.size() < 4)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
+ if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
return m_BlockStore.IterateBlock(
FoundChunkLocations,
ChunkIndexes,
@@ -378,19 +391,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
Latch WorkLatch(1);
std::atomic_bool AsyncContinue = true;
bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (ChunkIndexes.size() < 3)
- {
- for (size_t ChunkIndex : ChunkIndexes)
- {
- IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
- if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk))
- {
- return false;
- }
- }
- return true;
- }
- else if (OptionalWorkerPool)
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
{
WorkLatch.AddCount(1);
OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {