aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-04 19:30:34 +0200
committerGitHub Enterprise <[email protected]>2024-06-04 19:30:34 +0200
commitbbe46452530a98a0bd36c0024d4f3f914ae23604 (patch)
treee9c901a6ec68d087bc7e746b38d1573b2999a0ef /src/zenstore/cache
parentUse a smaller thread pool for network operations when doing oplog import to r... (diff)
downloadzen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz
zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open * add ability to control size limit for small chunk callback when iterating block * Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp360
-rw-r--r--src/zenstore/cache/cacherpc.cpp166
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp145
3 files changed, 583 insertions, 88 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index d67e8d6c8..9dd2e4a67 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1170,9 +1170,9 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return {};
}
-struct ZenCacheDiskLayer::CacheBucket::BatchHandle
+struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
@@ -1184,14 +1184,14 @@ struct ZenCacheDiskLayer::CacheBucket::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::CacheBucket::BatchHandle*
+ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
try
{
@@ -1272,6 +1272,232 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
}
}
+struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ {
+ Keys.reserve(OutResults.capacity());
+ ResultIndexes.reserve(OutResults.capacity());
+ }
+
+ std::vector<IoHash> Keys;
+ std::vector<size_t> ResultIndexes;
+
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ return new GetBatchHandle(OutResult);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+
+ ZEN_ASSERT(Batch);
+ ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ if (!Batch->ResultIndexes.empty())
+ {
+ std::vector<DiskLocation> StandaloneDiskLocations;
+ std::vector<size_t> StandaloneKeyIndexes;
+ std::vector<DiskLocation> InlineDiskLocations;
+ std::vector<BlockStoreLocation> InlineBlockLocations;
+ std::vector<size_t> InlineKeyIndexes;
+ std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
+ {
+ const IoHash& HashKey = Batch->Keys[KeyIndex];
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+
+ const PayloadIndex PayloadIdx = It.value();
+ m_AccessTimes[PayloadIdx] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[PayloadIdx];
+ const DiskLocation& Location = Payload.Location;
+
+ FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (Payload.MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize[KeyIndex] = false;
+ }
+
+ if (Payload.MemCached)
+ {
+ ZEN_ASSERT(!FillRawHashAndRawSize[KeyIndex]);
+ OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
+ m_MemoryHitCount++;
+ }
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneDiskLocations.push_back(Location);
+ StandaloneKeyIndexes.push_back(KeyIndex);
+ }
+ else
+ {
+ InlineDiskLocations.push_back(Location);
+ InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
+ InlineKeyIndexes.push_back(KeyIndex);
+ }
+ }
+ }
+ }
+ }
+
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ if (!Value)
+ {
+ return;
+ }
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ OutValue.Value = std::move(Value);
+ OutValue.Value.SetContentType(Location.GetContentType());
+
+ bool AddToMemCache = false;
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ AddToMemCache = true;
+ }
+ }
+
+ if (SetMetaInfo)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ {
+ OutValue = ZenCacheValue{};
+ }
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ }
+
+ if (SetMetaInfo || AddToMemCache)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ {
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData && SetMetaInfo)
+ {
+ SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ }
+ if (!Payload.MemCached && AddToMemCache)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
+ }
+ }
+ }
+ };
+
+ // We don't want to read into memory if they are to big since we might only want to touch the compressed
+ // header before sending it along
+ if (!InlineDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
+ m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
+ m_BlockStore.IterateBlock(
+ InlineBlockLocations,
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
+ }
+
+ if (!StandaloneDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
+ for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ {
+ size_t KeyIndex = StandaloneKeyIndexes[Index];
+ const DiskLocation& Location = StandaloneDiskLocations[Index];
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ }
+ }
+
+ for (size_t ResultIndex : Batch->ResultIndexes)
+ {
+ bool Hit = !!Batch->OutResults[ResultIndex].Value;
+ if (Hit)
+ {
+ m_DiskHitCount++;
+ StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ }
+ else
+ {
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ }
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
+ BatchHandle.Keys.push_back(HashKey);
+ BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size());
+ BatchHandle.OutResults.push_back({});
+}
+
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -1379,6 +1605,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
}
+
if (OutValue.Value)
{
m_DiskHitCount++;
@@ -1396,7 +1623,7 @@ void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
@@ -2717,7 +2944,7 @@ void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
if (OptionalBatchHandle != nullptr)
@@ -3540,19 +3767,19 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
return Result;
}
-struct ZenCacheDiskLayer::BatchHandle
+struct ZenCacheDiskLayer::PutBatchHandle
{
- BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
- CacheBucket* Bucket;
- CacheBucket::BatchHandle* Handle;
+ CacheBucket* Bucket;
+ CacheBucket::PutBatchHandle* Handle;
};
- void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept
{
RwLock::SharedLockScope _(Lock);
- for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles)
{
ZEN_ASSERT(BucketHandle.Bucket);
ZEN_ASSERT(BucketHandle.Handle);
@@ -3560,7 +3787,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket)
+ CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket)
{
{
RwLock::SharedLockScope _(Lock);
@@ -3572,7 +3799,7 @@ struct ZenCacheDiskLayer::BatchHandle
}
}
- CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
if (NewBucketHandle == nullptr)
{
return nullptr;
@@ -3583,14 +3810,14 @@ struct ZenCacheDiskLayer::BatchHandle
std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
It != BucketHandles.end())
{
- CacheBucket::BatchHandle* Result = It->Handle;
+ CacheBucket::PutBatchHandle* Result = It->Handle;
ZEN_ASSERT(Result != nullptr);
_.ReleaseNow();
Bucket->EndPutBatch(NewBucketHandle);
return Result;
}
- BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+ BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
return NewBucketHandle;
}
@@ -3599,17 +3826,93 @@ struct ZenCacheDiskLayer::BatchHandle
std::vector<bool>& OutResults;
};
-ZenCacheDiskLayer::BatchHandle*
+ZenCacheDiskLayer::PutBatchHandle*
ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
{
- return new BatchHandle(OutResults);
+ return new PutBatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
+{
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+}
+
+struct ZenCacheDiskLayer::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ struct BucketHandle
+ {
+ CacheBucket* Bucket;
+ CacheBucket::GetBatchHandle* Handle;
+ };
+
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+
+ CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket)
+ {
+ {
+ RwLock::SharedLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ return It->Handle;
+ }
+ }
+
+ CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults);
+ if (NewBucketHandle == nullptr)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ CacheBucket::GetBatchHandle* Result = It->Handle;
+ ZEN_ASSERT(Result != nullptr);
+ _.ReleaseNow();
+ Bucket->EndGetBatch(NewBucketHandle);
+ return Result;
+ }
+
+ BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+
+ return NewBucketHandle;
+ }
+
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheValue>& OutResults;
+};
+
+ZenCacheDiskLayer::GetBatchHandle*
+ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+{
+ return new GetBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept
{
+ ZEN_TRACE_CPU("Z$::GetBatched");
ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
+ TryMemCacheTrim();
delete Batch;
}
@@ -3630,17 +3933,28 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::GetBatched");
+
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket));
+ }
+}
+
+void
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
- CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
+ CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
Bucket->Put(HashKey, Value, References, BucketBatchHandle);
TryMemCacheTrim();
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index d28eda8c4..f6e5d16b3 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -985,69 +985,93 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ std::vector<ZenCacheValue> CacheValues;
+ const uint64_t RequestCount = RequestsArray.Num();
+ CacheValues.reserve(RequestCount);
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (RequestCount > 1)
+ {
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, *Namespace, CacheValues);
+ }
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
- Stopwatch Timer;
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
+ {
+ return CbPackage{};
+ }
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ ZenCacheValue CacheValue;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (Batch)
+ {
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, *Batch.get());
+ }
+ else
+ {
+ CacheValues.push_back({});
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValues.back());
+ }
+ }
+ else
+ {
+ CacheValues.push_back({});
+ }
+ }
+ Batch.reset();
+ ZEN_ASSERT(CacheValues.size() == RequestsArray.Num());
+ }
+ for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++)
+ {
+ RequestData& Request = Requests[RequestIndex];
+ ZenCacheValue& Value = CacheValues[RequestIndex];
+ if (Value.Value)
{
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
- IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (IsCompressedBinary(Value.Value.GetContentType()))
{
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ Request.RawHash = Value.RawHash;
+ Request.RawSize = Value.RawSize;
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(Value.Value));
}
}
if (Request.Result)
{
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({})",
*Namespace,
- Key.Bucket,
- Key.Hash,
+ Request.Key.Bucket,
+ Request.Key.Hash,
NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ "LOCAL"sv);
m_CacheStats.HitCount++;
}
- else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ else if (HasUpstream && EnumHasAllFlags(Request.Policy, CachePolicy::QueryRemote))
{
- RemoteRequestIndexes.push_back(Requests.size() - 1);
+ RemoteRequestIndexes.push_back(RequestIndex);
}
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ else if (!EnumHasAnyFlags(Request.Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Request.Key.Bucket, Request.Key.Hash);
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({})", *Namespace, Request.Key.Bucket, Request.Key.Hash, "LOCAL"sv);
m_CacheStats.MissCount++;
}
}
@@ -1497,29 +1521,61 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- for (ChunkRequest* Request : ValueRequests)
+ std::vector<ZenCacheValue> Chunks;
+ Chunks.reserve(ValueRequests.size());
{
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
-
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (ValueRequests.size() > 1)
{
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, Namespace, Chunks);
+ }
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ Stopwatch Timer;
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (Batch)
{
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, *Batch.get());
+ }
+ else
+ {
+ Chunks.push_back({});
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, Chunks.back());
+ }
+ }
+ else
+ {
+ Chunks.push_back({});
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ }
+ for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++)
+ {
+ Stopwatch Timer;
+ ChunkRequest* Request = ValueRequests[RequestIndex];
+ if (Chunks[RequestIndex].Value)
+ {
+ if (IsCompressedBinary(Chunks[RequestIndex].Value.GetContentType()))
+ {
+ Request->Key->ChunkId = Chunks[RequestIndex].RawHash;
+ Request->Exists = true;
+ Request->RawSize = Chunks[RequestIndex].RawSize;
+ Request->RawSizeKnown = true;
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Chunks[RequestIndex].Value));
}
}
}
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
+ Stopwatch Timer;
if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 9eded2a50..d9c2d3e59 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -137,21 +137,21 @@ ZenCacheNamespace::~ZenCacheNamespace()
m_Gc.RemoveGcContributor(this);
}
-struct ZenCacheNamespace::BatchHandle
+struct ZenCacheNamespace::PutBatchHandle
{
- ZenCacheDiskLayer::BatchHandle* DiskLayerHandle = nullptr;
+ ZenCacheDiskLayer::PutBatchHandle* DiskLayerHandle = nullptr;
};
-ZenCacheNamespace::BatchHandle*
+ZenCacheNamespace::PutBatchHandle*
ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
{
- ZenCacheNamespace::BatchHandle* Handle = new ZenCacheNamespace::BatchHandle;
- Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
+ ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
+ Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
return Handle;
}
void
-ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept
+ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept
{
try
{
@@ -165,6 +165,50 @@ ZenCacheNamespace::EndPutBatch(BatchHandle* Batch) noexcept
}
}
+struct ZenCacheNamespace::GetBatchHandle
+{
+ GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {}
+ std::vector<ZenCacheValue>& Results;
+ ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr;
+};
+
+ZenCacheNamespace::GetBatchHandle*
+ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+{
+ ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult);
+ Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult);
+ return Handle;
+}
+
+void
+ZenCacheNamespace::EndGetBatch(GetBatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.EndGetBatch(Batch->DiskLayerHandle);
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+ for (const ZenCacheValue& Result : Batch->Results)
+ {
+ if (Result.Value)
+ {
+ m_HitCount++;
+ StatsScope.SetBytes(Result.Value.Size());
+ }
+ else
+ {
+ m_MissCount++;
+ }
+ }
+ delete Batch;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache namespace layer when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -173,7 +217,6 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
-
if (Ok)
{
ZEN_ASSERT(OutValue.Value.Size());
@@ -188,11 +231,22 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::GetBatched");
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ m_DiskLayer.Get(InBucket, HashKey, *BatchHandle.DiskLayerHandle);
+ return;
+}
+
+void
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
- BatchHandle* OptionalBatchHandle)
+ PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Namespace::Put");
@@ -202,7 +256,8 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
- m_DiskLayer.Put(InBucket, HashKey, Value, References, OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr);
+ ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
+ m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
m_WriteCount++;
}
@@ -500,6 +555,43 @@ ZenCacheStore::PutBatch::~PutBatch()
}
}
+ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult)
+: m_CacheStore(CacheStore)
+, Results(OutResult)
+{
+ if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store)
+ {
+ m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult);
+ }
+}
+
+ZenCacheStore::GetBatch::~GetBatch()
+{
+ try
+ {
+ if (m_Store)
+ {
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->EndGetBatch(m_NamespaceBatchHandle);
+
+ metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0);
+ for (const ZenCacheValue& Result : Results)
+ {
+ if (Result.Value)
+ {
+ m_CacheStore.m_HitCount++;
+ OpScope.SetBytes(Result.Value.GetSize());
+ }
+ m_CacheStore.m_MissCount++;
+ }
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache store when ending batch get operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheStore::Get(const CacheRequestContext& Context,
std::string_view Namespace,
@@ -562,6 +654,39 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
}
void
+ZenCacheStore::Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ GetBatch& BatchHandle)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedReadCount++;
+ return;
+ }
+ ZEN_TRACE_CPU("Z$::Get");
+
+ metrics::RequestStats::Scope OpScope(m_GetOps, 0);
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ Store->Get(Bucket, HashKey, *BatchHandle.m_NamespaceBatchHandle);
+ return;
+ }
+
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+
+ m_MissCount++;
+}
+
+void
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
@@ -603,7 +728,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
- ZenCacheNamespace::BatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
+ ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
Store->Put(Bucket, HashKey, Value, References, BatchHandle);
m_WriteCount++;
return;