aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-04 19:30:34 +0200
committerGitHub Enterprise <[email protected]>2024-06-04 19:30:34 +0200
commitbbe46452530a98a0bd36c0024d4f3f914ae23604 (patch)
treee9c901a6ec68d087bc7e746b38d1573b2999a0ef /src/zenstore/cache/cachedisklayer.cpp
parentUse a smaller thread pool for network operations when doing oplog import to r... (diff)
downloadzen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz
zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open * add ability to control size limit for small chunk callback when iterating block * Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp360
1 files changed, 337 insertions, 23 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index d67e8d6c8..9dd2e4a67 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1170,9 +1170,9 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return {};
}
-struct ZenCacheDiskLayer::CacheBucket::BatchHandle
+struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
@@ -1184,14 +1184,14 @@ struct ZenCacheDiskLayer::CacheBucket::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::CacheBucket::BatchHandle*
+ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
try
{
@@ -1272,6 +1272,232 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
}
}
+struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ {
+ Keys.reserve(OutResults.capacity());
+ ResultIndexes.reserve(OutResults.capacity());
+ }
+
+ std::vector<IoHash> Keys;
+ std::vector<size_t> ResultIndexes;
+
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ return new GetBatchHandle(OutResult);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+
+ ZEN_ASSERT(Batch);
+ ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ if (!Batch->ResultIndexes.empty())
+ {
+ std::vector<DiskLocation> StandaloneDiskLocations;
+ std::vector<size_t> StandaloneKeyIndexes;
+ std::vector<DiskLocation> InlineDiskLocations;
+ std::vector<BlockStoreLocation> InlineBlockLocations;
+ std::vector<size_t> InlineKeyIndexes;
+ std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
+ {
+ const IoHash& HashKey = Batch->Keys[KeyIndex];
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+
+ const PayloadIndex PayloadIdx = It.value();
+ m_AccessTimes[PayloadIdx] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[PayloadIdx];
+ const DiskLocation& Location = Payload.Location;
+
+ FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (Payload.MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize[KeyIndex] = false;
+ }
+
+ if (Payload.MemCached)
+ {
+ ZEN_ASSERT(!FillRawHashAndRawSize[KeyIndex]);
+ OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
+ m_MemoryHitCount++;
+ }
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneDiskLocations.push_back(Location);
+ StandaloneKeyIndexes.push_back(KeyIndex);
+ }
+ else
+ {
+ InlineDiskLocations.push_back(Location);
+ InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
+ InlineKeyIndexes.push_back(KeyIndex);
+ }
+ }
+ }
+ }
+ }
+
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ if (!Value)
+ {
+ return;
+ }
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ OutValue.Value = std::move(Value);
+ OutValue.Value.SetContentType(Location.GetContentType());
+
+ bool AddToMemCache = false;
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ AddToMemCache = true;
+ }
+ }
+
+ if (SetMetaInfo)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ {
+ OutValue = ZenCacheValue{};
+ }
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ }
+
+ if (SetMetaInfo || AddToMemCache)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ {
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData && SetMetaInfo)
+ {
+ SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ }
+ if (!Payload.MemCached && AddToMemCache)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
+ }
+ }
+ }
+ };
+
+ // We don't want to read into memory if they are to big since we might only want to touch the compressed
+ // header before sending it along
+ if (!InlineDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
+ m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
+ m_BlockStore.IterateBlock(
+ InlineBlockLocations,
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
+ }
+
+ if (!StandaloneDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
+ for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ {
+ size_t KeyIndex = StandaloneKeyIndexes[Index];
+ const DiskLocation& Location = StandaloneDiskLocations[Index];
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ }
+ }
+
+ for (size_t ResultIndex : Batch->ResultIndexes)
+ {
+ bool Hit = !!Batch->OutResults[ResultIndex].Value;
+ if (Hit)
+ {
+ m_DiskHitCount++;
+ StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ }
+ else
+ {
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ }
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ BatchHandle.Keys.push_back(HashKey);
+ BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size());
+ BatchHandle.OutResults.push_back({});
+}
+
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -1379,6 +1605,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
}
+
if (OutValue.Value)
{
m_DiskHitCount++;
@@ -1396,7 +1623,7 @@ void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
@@ -2717,7 +2944,7 @@ void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
if (OptionalBatchHandle != nullptr)
@@ -3540,19 +3767,19 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
return Result;
}
-struct ZenCacheDiskLayer::BatchHandle
+struct ZenCacheDiskLayer::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
- CacheBucket* Bucket;
- CacheBucket::BatchHandle* Handle;
+ CacheBucket* Bucket;
+ CacheBucket::PutBatchHandle* Handle;
};
- void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept
{
RwLock::SharedLockScope _(Lock);
- for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles)
{
ZEN_ASSERT(BucketHandle.Bucket);
ZEN_ASSERT(BucketHandle.Handle);
@@ -3560,7 +3787,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket)
+ CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket)
{
{
RwLock::SharedLockScope _(Lock);
@@ -3572,7 +3799,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
if (NewBucketHandle == nullptr)
{
return nullptr;
@@ -3583,14 +3810,14 @@ struct ZenCacheDiskLayer::BatchHandle
std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
It != BucketHandles.end())
{
- CacheBucket::BatchHandle* Result = It->Handle;
+ CacheBucket::PutBatchHandle* Result = It->Handle;
ZEN_ASSERT(Result != nullptr);
_.ReleaseNow();
Bucket->EndPutBatch(NewBucketHandle);
return Result;
}
- BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+ BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
return NewBucketHandle;
}
@@ -3599,17 +3826,93 @@ struct ZenCacheDiskLayer::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::BatchHandle*
+ZenCacheDiskLayer::PutBatchHandle*
ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
+{
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+}
+
+struct ZenCacheDiskLayer::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ struct BucketHandle
+ {
+ CacheBucket* Bucket;
+ CacheBucket::GetBatchHandle* Handle;
+ };
+
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+
+ CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket)
+ {
+ {
+ RwLock::SharedLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ return It->Handle;
+ }
+ }
+
+ CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults);
+ if (NewBucketHandle == nullptr)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ CacheBucket::GetBatchHandle* Result = It->Handle;
+ ZEN_ASSERT(Result != nullptr);
+ _.ReleaseNow();
+ Bucket->EndGetBatch(NewBucketHandle);
+ return Result;
+ }
+
+ BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+
+ return NewBucketHandle;
+ }
+
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::GetBatchHandle*
+ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+{
+ return new GetBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept
{
+ ZEN_TRACE_CPU("Z$::GetBatched");
ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
+ TryMemCacheTrim();
delete Batch;
}
@@ -3630,17 +3933,28 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::GetBatched");
+
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket));
+ }
+}
+
+void
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
- CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
+ CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
Bucket->Put(HashKey, Value, References, BucketBatchHandle);
TryMemCacheTrim();
}