aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-04 19:30:34 +0200
committerGitHub Enterprise <[email protected]>2024-06-04 19:30:34 +0200
commitbbe46452530a98a0bd36c0024d4f3f914ae23604 (patch)
treee9c901a6ec68d087bc7e746b38d1573b2999a0ef /src
parentUse a smaller thread pool for network operations when doing oplog import to r... (diff)
downloadzen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz
zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open * add ability to control size limit for small chunk callback when iterating block * Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src')
-rw-r--r--src/zenstore/blockstore.cpp54
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp360
-rw-r--r--src/zenstore/cache/cacherpc.cpp166
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp145
-rw-r--r--src/zenstore/include/zenstore/blockstore.h4
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h32
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h39
7 files changed, 679 insertions, 121 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index aef7b348b..6e289409c 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -64,7 +64,8 @@ BlockStoreFile::Open()
return true;
});
void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true);
+ m_CachedFileSize = m_File.FileSize();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_CachedFileSize, /*IsWholeFile*/ true);
}
void
@@ -100,7 +101,7 @@ BlockStoreFile::Create(uint64_t InitialSize)
uint64_t
BlockStoreFile::FileSize()
{
- return m_File.FileSize();
+ return m_CachedFileSize == 0 ? m_File.FileSize() : m_CachedFileSize;
}
void
@@ -156,7 +157,8 @@ BlockStoreFile::IsOpen() const
return !!m_IoBuffer;
}
-constexpr uint64_t IterateSmallChunkWindowSize = 2 * 1024 * 1024;
+constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024;
+constexpr uint64_t IterateSmallChunkMaxGapSize = 4 * 1024;
BlockStore::BlockStore()
{
@@ -1041,20 +1043,33 @@ bool
BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
std::span<const size_t> InChunkIndexes,
const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback)
+ const IterateChunksLargeSizeCallback& LargeSizeCallback,
+ uint64_t LargeSizeLimit)
{
if (InChunkIndexes.empty())
{
return true;
}
+ uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit);
+ if (LargeSizeLimit == 0u)
+ {
+ LargeSizeLimit = IterateSmallChunkWindowSize;
+ }
+ else
+ {
+ IterateSmallChunkWindowSize =
+ Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
+ }
+
uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
});
- auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t {
+ auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, &ChunkLocations](std::span<const size_t> ChunkIndexes,
+ size_t StartIndexOffset) -> size_t {
size_t ChunkCount = 0;
size_t StartIndex = ChunkIndexes[StartIndexOffset];
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
@@ -1065,7 +1080,11 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount];
const BlockStoreLocation& Location = ChunkLocations[NextIndex];
ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex);
- if (Location.Offset >= (LastEnd + (4u * 1024u)))
+ if (Location.Size > LargeSizeLimit)
+ {
+ break;
+ }
+ if (Location.Offset >= (LastEnd + IterateSmallChunkMaxGapSize))
{
break;
}
@@ -1097,8 +1116,9 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
}
else
{
- const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ Ref<BlockStoreFile> BlockFile = FindBlockIt->second;
ZEN_ASSERT(BlockFile);
+ InsertLock.ReleaseNow();
IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
void* BufferBase = ReadBuffer.MutableData();
@@ -1129,11 +1149,17 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
BlockIndex,
BlockSize);
- SmallSizeCallback(NextChunkIndex, nullptr, 0);
+ if (!SmallSizeCallback(NextChunkIndex, nullptr, 0))
+ {
+ return false;
+ }
continue;
}
void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
- SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ if (!SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size))
+ {
+ return false;
+ }
}
LocationIndexOffset += RangeCount;
continue;
@@ -1167,7 +1193,7 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati
Stopwatch Timer;
auto _ = MakeGuard([&]() {
- ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_DEBUG("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
@@ -1838,7 +1864,7 @@ TEST_CASE("blockstore.iterate.chunks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", IterateSmallChunkWindowSize * 2, 1024);
+ Store.Initialize(RootDirectory / "store", DefaultIterateSmallChunkWindowSize * 2, 1024);
IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
CHECK(!BadChunk);
@@ -1849,13 +1875,13 @@ TEST_CASE("blockstore.iterate.chunks")
BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
Store.Flush(/*ForceNewBlock*/ false);
- std::string VeryLargeChunk(IterateSmallChunkWindowSize * 2, 'L');
+ std::string VeryLargeChunk(DefaultIterateSmallChunkWindowSize * 2, 'L');
BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0};
BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0,
- .Offset = IterateSmallChunkWindowSize,
- .Size = IterateSmallChunkWindowSize * 2};
+ .Offset = DefaultIterateSmallChunkWindowSize,
+ .Size = DefaultIterateSmallChunkWindowSize * 2};
BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
WorkerThreadPool WorkerPool(4);
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index d67e8d6c8..9dd2e4a67 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1170,9 +1170,9 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return {};
}
-struct ZenCacheDiskLayer::CacheBucket::BatchHandle
+struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
@@ -1184,14 +1184,14 @@ struct ZenCacheDiskLayer::CacheBucket::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::CacheBucket::BatchHandle*
+ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
try
{
@@ -1272,6 +1272,232 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
}
}
+struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ {
+ Keys.reserve(OutResults.capacity());
+ ResultIndexes.reserve(OutResults.capacity());
+ }
+
+ std::vector<IoHash> Keys;
+ std::vector<size_t> ResultIndexes;
+
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ return new GetBatchHandle(OutResult);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+
+ ZEN_ASSERT(Batch);
+ ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ if (!Batch->ResultIndexes.empty())
+ {
+ std::vector<DiskLocation> StandaloneDiskLocations;
+ std::vector<size_t> StandaloneKeyIndexes;
+ std::vector<DiskLocation> InlineDiskLocations;
+ std::vector<BlockStoreLocation> InlineBlockLocations;
+ std::vector<size_t> InlineKeyIndexes;
+ std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
+ {
+ const IoHash& HashKey = Batch->Keys[KeyIndex];
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+
+ const PayloadIndex PayloadIdx = It.value();
+ m_AccessTimes[PayloadIdx] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[PayloadIdx];
+ const DiskLocation& Location = Payload.Location;
+
+ FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (Payload.MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize[KeyIndex] = false;
+ }
+
+ if (Payload.MemCached)
+ {
+ ZEN_ASSERT(!FillRawHashAndRawSize[KeyIndex]);
+ OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
+ m_MemoryHitCount++;
+ }
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneDiskLocations.push_back(Location);
+ StandaloneKeyIndexes.push_back(KeyIndex);
+ }
+ else
+ {
+ InlineDiskLocations.push_back(Location);
+ InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
+ InlineKeyIndexes.push_back(KeyIndex);
+ }
+ }
+ }
+ }
+ }
+
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ if (!Value)
+ {
+ return;
+ }
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ OutValue.Value = std::move(Value);
+ OutValue.Value.SetContentType(Location.GetContentType());
+
+ bool AddToMemCache = false;
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ AddToMemCache = true;
+ }
+ }
+
+ if (SetMetaInfo)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ {
+ OutValue = ZenCacheValue{};
+ }
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ }
+
+ if (SetMetaInfo || AddToMemCache)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ {
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData && SetMetaInfo)
+ {
+ SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ }
+ if (!Payload.MemCached && AddToMemCache)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
+ }
+ }
+ }
+ };
+
+ // We don't want to read into memory if they are to big since we might only want to touch the compressed
+ // header before sending it along
+ if (!InlineDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
+ m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
+ m_BlockStore.IterateBlock(
+ InlineBlockLocations,
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
+ }
+
+ if (!StandaloneDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
+ for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ {
+ size_t KeyIndex = StandaloneKeyIndexes[Index];
+ const DiskLocation& Location = StandaloneDiskLocations[Index];
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ }
+ }
+
+ for (size_t ResultIndex : Batch->ResultIndexes)
+ {
+ bool Hit = !!Batch->OutResults[ResultIndex].Value;
+ if (Hit)
+ {
+ m_DiskHitCount++;
+ StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ }
+ else
+ {
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ }
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ BatchHandle.Keys.push_back(HashKey);
+ BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size());
+ BatchHandle.OutResults.push_back({});
+}
+
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -1379,6 +1605,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
}
+
if (OutValue.Value)
{
m_DiskHitCount++;
@@ -1396,7 +1623,7 @@ void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
@@ -2717,7 +2944,7 @@ void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
if (OptionalBatchHandle != nullptr)
@@ -3540,19 +3767,19 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
return Result;
}
-struct ZenCacheDiskLayer::BatchHandle
+struct ZenCacheDiskLayer::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
- CacheBucket* Bucket;
- CacheBucket::BatchHandle* Handle;
+ CacheBucket* Bucket;
+ CacheBucket::PutBatchHandle* Handle;
};
- void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept
{
RwLock::SharedLockScope _(Lock);
- for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles)
{
ZEN_ASSERT(BucketHandle.Bucket);
ZEN_ASSERT(BucketHandle.Handle);
@@ -3560,7 +3787,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket)
+ CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket)
{
{
RwLock::SharedLockScope _(Lock);
@@ -3572,7 +3799,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
if (NewBucketHandle == nullptr)
{
return nullptr;
@@ -3583,14 +3810,14 @@ struct ZenCacheDiskLayer::BatchHandle
std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
It != BucketHandles.end())
{
- CacheBucket::BatchHandle* Result = It->Handle;
+ CacheBucket::PutBatchHandle* Result = It->Handle;
ZEN_ASSERT(Result != nullptr);
_.ReleaseNow();
Bucket->EndPutBatch(NewBucketHandle);
return Result;
}
- BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+ BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
return NewBucketHandle;
}
@@ -3599,17 +3826,93 @@ struct ZenCacheDiskLayer::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::BatchHandle*
+ZenCacheDiskLayer::PutBatchHandle*
ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
+{
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+}
+
+struct ZenCacheDiskLayer::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ struct BucketHandle
+ {
+ CacheBucket* Bucket;
+ CacheBucket::GetBatchHandle* Handle;
+ };
+
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+
+ CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket)
+ {
+ {
+ RwLock::SharedLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ return It->Handle;
+ }
+ }
+
+ CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults);
+ if (NewBucketHandle == nullptr)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ CacheBucket::GetBatchHandle* Result = It->Handle;
+ ZEN_ASSERT(Result != nullptr);
+ _.ReleaseNow();
+ Bucket->EndGetBatch(NewBucketHandle);
+ return Result;
+ }
+
+ BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+
+ return NewBucketHandle;
+ }
+
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::GetBatchHandle*
+ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+{
+ return new GetBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept
{
+ ZEN_TRACE_CPU("Z$::GetBatched");
ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
+ TryMemCacheTrim();
delete Batch;
}
@@ -3630,17 +3933,28 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::GetBatched");
+
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket));
+ }
+}
+
+void
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
- CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
+ CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
Bucket->Put(HashKey, Value, References, BucketBatchHandle);
TryMemCacheTrim();
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index d28eda8c4..f6e5d16b3 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -985,69 +985,93 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ std::vector<ZenCacheValue> CacheValues;
+ const uint64_t RequestCount = RequestsArray.Num();
+ CacheValues.reserve(RequestCount);
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (RequestCount > 1)
+ {
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, *Namespace, CacheValues);
+ }
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
- Stopwatch Timer;
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
+ {
+ return CbPackage{};
+ }
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ ZenCacheValue CacheValue;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (Batch)
+ {
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, *Batch.get());
+ }
+ else
+ {
+ CacheValues.push_back({});
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValues.back());
+ }
+ }
+ else
+ {
+ CacheValues.push_back({});
+ }
+ }
+ Batch.reset();
+ ZEN_ASSERT(CacheValues.size() == RequestsArray.Num());
+ }
+ for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++)
+ {
+ RequestData& Request = Requests[RequestIndex];
+ ZenCacheValue& Value = CacheValues[RequestIndex];
+ if (Value.Value)
{
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
- IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (IsCompressedBinary(Value.Value.GetContentType()))
{
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ Request.RawHash = Value.RawHash;
+ Request.RawSize = Value.RawSize;
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(Value.Value));
}
}
if (Request.Result)
{
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({})",
*Namespace,
- Key.Bucket,
- Key.Hash,
+ Request.Key.Bucket,
+ Request.Key.Hash,
NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ "LOCAL"sv);
m_CacheStats.HitCount++;
}
- else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ else if (HasUpstream && EnumHasAllFlags(Request.Policy, CachePolicy::QueryRemote))
{
- RemoteRequestIndexes.push_back(Requests.size() - 1);
+ RemoteRequestIndexes.push_back(RequestIndex);
}
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ else if (!EnumHasAnyFlags(Request.Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Request.Key.Bucket, Request.Key.Hash);
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({})", *Namespace, Request.Key.Bucket, Request.Key.Hash, "LOCAL"sv);
m_CacheStats.MissCount++;
}
}
@@ -1497,29 +1521,61 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- for (ChunkRequest* Request : ValueRequests)
+ std::vector<ZenCacheValue> Chunks;
+ Chunks.reserve(ValueRequests.size());
{
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
-
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (ValueRequests.size() > 1)
{
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, Namespace, Chunks);
+ }
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ Stopwatch Timer;
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (Batch)
{
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, *Batch.get());
+ }
+ else
+ {
+ Chunks.push_back({});
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, Chunks.back());
+ }
+ }
+ else
+ {
+ Chunks.push_back({});
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ }
+ for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++)
+ {
+ Stopwatch Timer;
+ ChunkRequest* Request = ValueRequests[RequestIndex];
+ if (Chunks[RequestIndex].Value)
+ {
+ if (IsCompressedBinary(Chunks[RequestIndex].Value.GetContentType()))
+ {
+ Request->Key->ChunkId = Chunks[RequestIndex].RawHash;
+ Request->Exists = true;
+ Request->RawSize = Chunks[RequestIndex].RawSize;
+ Request->RawSizeKnown = true;
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Chunks[RequestIndex].Value));
}
}
}
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
+ Stopwatch Timer;
if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 9eded2a50..d9c2d3e59 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -137,21 +137,21 @@ ZenCacheNamespace::~ZenCacheNamespace()
m_Gc.RemoveGcContributor(this);
}
-struct ZenCacheNamespace::BatchHandle
+struct ZenCacheNamespace::PutBatchHandle
{
- ZenCacheDiskLayer::BatchHandle* DiskLayerHandle = nullptr;
+ ZenCacheDiskLayer::PutBatchHandle* DiskLayerHandle = nullptr;
};
-ZenCacheNamespace::BatchHandle*
+ZenCacheNamespace::PutBatchHandle*
ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
{
- ZenCacheNamespace::BatchHandle* Handle = new ZenCacheNamespace::BatchHandle;
- Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
+ ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
+ Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
return Handle;
}
void
-ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept
{
try
{
@@ -165,6 +165,50 @@ ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept
}
}
+struct ZenCacheNamespace::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {}
+ std::vector<ZenCacheValue>& Results;
+ ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr;
+};
+
+ZenCacheNamespace::GetBatchHandle*
+ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+{
+ ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult);
+ Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult);
+ return Handle;
+}
+
+void
+ZenCacheNamespace::EndGetBatch(GetBatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.EndGetBatch(Batch->DiskLayerHandle);
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+ for (const ZenCacheValue& Result : Batch->Results)
+ {
+ if (Result.Value)
+ {
+ m_HitCount++;
+ StatsScope.SetBytes(Result.Value.Size());
+ }
+ else
+ {
+ m_MissCount++;
+ }
+ }
+ delete Batch;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache namespace layer when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -173,7 +217,6 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
-
if (Ok)
{
ZEN_ASSERT(OutValue.Value.Size());
@@ -188,11 +231,22 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::GetBatched");
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ m_DiskLayer.Get(InBucket, HashKey, *BatchHandle.DiskLayerHandle);
+ return;
+}
+
+void
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Namespace::Put");
@@ -202,7 +256,8 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
- m_DiskLayer.Put(InBucket, HashKey, Value, References, OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr);
+ ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
+ m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
m_WriteCount++;
}
@@ -500,6 +555,43 @@ ZenCacheStore::PutBatch::~PutBatch()
}
}
+ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult)
+: m_CacheStore(CacheStore)
+, Results(OutResult)
+{
+ if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store)
+ {
+ m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult);
+ }
+}
+
+ZenCacheStore::GetBatch::~GetBatch()
+{
+ try
+ {
+ if (m_Store)
+ {
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->EndGetBatch(m_NamespaceBatchHandle);
+
+ metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0);
+ for (const ZenCacheValue& Result : Results)
+ {
+ if (Result.Value)
+ {
+ m_CacheStore.m_HitCount++;
+ OpScope.SetBytes(Result.Value.GetSize());
+ }
+ m_CacheStore.m_MissCount++;
+ }
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache store when ending batch get operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheStore::Get(const CacheRequestContext& Context,
std::string_view Namespace,
@@ -562,6 +654,39 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
}
void
+ZenCacheStore::Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ GetBatch& BatchHandle)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedReadCount++;
+ return;
+ }
+ ZEN_TRACE_CPU("Z$::Get");
+
+ metrics::RequestStats::Scope OpScope(m_GetOps, 0);
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ Store->Get(Bucket, HashKey, *BatchHandle.m_NamespaceBatchHandle);
+ return;
+ }
+
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+
+ m_MissCount++;
+}
+
+void
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
@@ -603,7 +728,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
- ZenCacheNamespace::BatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
+ ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
Store->Put(Bucket, HashKey, Value, References, BatchHandle);
m_WriteCount++;
return;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index a1e497533..ba8259c82 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -107,6 +107,7 @@ private:
const std::filesystem::path m_Path;
IoBuffer m_IoBuffer;
BasicFile m_File;
+ uint64_t m_CachedFileSize = 0;
};
class BlockStoreCompactState;
@@ -183,7 +184,8 @@ public:
bool IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
std::span<const size_t> ChunkIndexes,
const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback);
+ const IterateChunksLargeSizeCallback& LargeSizeCallback,
+ uint64_t LargeSizeLimit = 0);
void CompactBlocks(
const BlockStoreCompactState& CompactState,
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 427c338d6..9dee4d3f7 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -164,16 +164,21 @@ public:
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
- struct BatchHandle;
- BatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(BatchHandle* Batch) noexcept;
+ struct GetBatchHandle;
+ GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult);
+ void EndGetBatch(GetBatchHandle* Batch) noexcept;
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
+
+ struct PutBatchHandle;
+ PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle);
+ PutBatchHandle* OptionalBatchHandle);
bool Drop();
bool DropBucket(std::string_view Bucket);
void Flush();
@@ -205,12 +210,17 @@ public:
~CacheBucket();
bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- struct BatchHandle;
- BatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(BatchHandle* Batch) noexcept;
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, BatchHandle* OptionalBatchHandle);
+ struct GetBatchHandle;
+ GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult);
+ void EndGetBatch(GetBatchHandle* Batch) noexcept;
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
+
+ struct PutBatchHandle;
+ PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle);
uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
bool Drop();
void Flush();
@@ -342,7 +352,7 @@ public:
void PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle = nullptr);
+ PutBatchHandle* OptionalBatchHandle = nullptr);
IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 02bbeed77..7460d01ce 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -80,16 +80,21 @@ public:
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
- struct BatchHandle;
- BatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
- void EndPutBatch(BatchHandle* Batch) noexcept;
+ struct PutBatchHandle;
+ PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+
+ struct GetBatchHandle;
+ GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults);
+ void EndGetBatch(GetBatchHandle* Batch) noexcept;
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
void Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle = nullptr);
+ PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -199,10 +204,24 @@ public:
~PutBatch();
private:
- ZenCacheStore& m_CacheStore;
- ZenCacheNamespace* m_Store = nullptr;
- ZenCacheNamespace::BatchHandle* m_NamespaceBatchHandle = nullptr;
+ ZenCacheStore& m_CacheStore;
+ ZenCacheNamespace* m_Store = nullptr;
+ ZenCacheNamespace::PutBatchHandle* m_NamespaceBatchHandle = nullptr;
+
+ friend class ZenCacheStore;
+ };
+
+ class GetBatch
+ {
+ public:
+ GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<ZenCacheValue>& OutResult);
+ ~GetBatch();
+ private:
+ ZenCacheStore& m_CacheStore;
+ ZenCacheNamespace* m_Store = nullptr;
+ ZenCacheNamespace::GetBatchHandle* m_NamespaceBatchHandle = nullptr;
+ std::vector<ZenCacheValue>& Results;
friend class ZenCacheStore;
};
@@ -212,6 +231,12 @@ public:
const IoHash& HashKey,
ZenCacheValue& OutValue);
+ void Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ GetBatch& BatchHandle);
+
void Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,