diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-14 09:50:00 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2025-03-14 09:50:00 +0100 |
| commit | 55c67aec301cfc99178ab54c6366cbc88f35d46a (patch) | |
| tree | 84b4c73220f7dd041763b6d1919eedc8d0b90844 /src/zenstore/cache | |
| parent | Merge remote-tracking branch 'origin/de/zen-service-command' into de/zen-serv... (diff) | |
| parent | fix quoted command lines arguments (#306) (diff) | |
| download | zen-55c67aec301cfc99178ab54c6366cbc88f35d46a.tar.xz zen-55c67aec301cfc99178ab54c6366cbc88f35d46a.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 121 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 73 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 8 |
3 files changed, 113 insertions, 89 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..61552fafc 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -708,11 +708,11 @@ namespace zen { ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, - std::string BucketName, + std::string_view BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) -, m_BucketName(std::move(BucketName)) +, m_BucketName(BucketName) , m_Configuration(Config) , m_BucketId(Oid::Zero) { @@ -1329,7 +1329,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) { Keys.reserve(OutResults.capacity()); ResultIndexes.reserve(OutResults.capacity()); @@ -1340,11 +1340,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle std::vector<IoHash> Keys; std::vector<size_t> ResultIndexes; - std::vector<ZenCacheValue>& OutResults; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); return new GetBatchHandle(OutResult); @@ -1364,13 +1364,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!Batch->ResultIndexes.empty()) { - std::vector<DiskLocation> StandaloneDiskLocations; - std::vector<size_t> StandaloneKeyIndexes; - std::vector<size_t> MemCachedKeyIndexes; - std::vector<DiskLocation> InlineDiskLocations; - std::vector<BlockStoreLocation> InlineBlockLocations; - std::vector<size_t> InlineKeyIndexes; - std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false); + eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations; + eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes; + eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes; + eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations; + eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations; + eastl::fixed_vector<size_t, 16> InlineKeyIndexes; + eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false); { RwLock::SharedLockScope IndexLock(m_IndexLock); for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) @@ -1526,33 +1526,35 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!InlineDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); - m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { - // Only read into memory the IoBuffers we could potentially add to memcache - const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); - m_BlockStore.IterateBlock( - InlineBlockLocations, - ChunkIndexes, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - const void* Data, - uint64_t Size) -> bool { - if (Data != nullptr) - { - FillOne(InlineDiskLocations[ChunkIndex], - InlineKeyIndexes[ChunkIndex], - IoBufferBuilder::MakeCloneFromMemory(Data, Size)); - } - return true; - }, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - BlockStoreFile& File, - uint64_t Offset, - uint64_t Size) -> bool { - FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - return true; - }, - LargeChunkSizeLimit); - return true; - }); + m_BlockStore.IterateChunks( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + // Only read into memory the IoBuffers we could potentially add to memcache + const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); + m_BlockStore.IterateBlock( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + const void* Data, + uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeCloneFromMemory(Data, Size)); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + return true; + }, + LargeChunkSizeLimit); + return true; + }); } if (!StandaloneDiskLocations.empty()) @@ -3581,15 +3583,29 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer() } } +template<typename T, typename U> +struct equal_to_2 : public eastl::binary_function<T, U, bool> +{ + constexpr bool operator()(const T& a, const U& b) const { return a == b; } + + template<typename T_ = T, + typename U_ = U, + typename = eastl::enable_if_t<!eastl::is_same_v<eastl::remove_const_t<T_>, eastl::remove_const_t<U_>>>> + constexpr bool operator()(const U& b, const T& a) const + { + return b == a; + } +}; + ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); - const auto BucketName = std::string(InBucket); { RwLock::SharedLockScope SharedLock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } @@ -3597,31 +3613,32 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock - std::unique_ptr<CacheBucket> Bucket( - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, m_Configuration.BucketConfig)); RwLock::ExclusiveLockScope Lock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; + BucketPath /= InBucket; try { if (!Bucket->OpenOrCreate(BucketPath)) { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir); return nullptr; } } catch (const std::exception& Err) { - ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what()); throw; } + std::string BucketName{InBucket}; CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); if (m_CapturedBuckets) @@ -3720,7 +3737,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {} + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3780,13 +3797,13 @@ struct ZenCacheDiskLayer::GetBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<ZenCacheValue>& OutResults; + RwLock Lock; + eastl::fixed_vector<BucketHandle, 4> BucketHandles; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::GetBatchHandle* -ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults) +ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults) { return new GetBatchHandle(OutResults); } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..97e26a38d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -20,6 +20,8 @@ #include <zencore/memory/llm.h> +#include <EASTL/fixed_vector.h> + ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); + Key = CacheKey::CreateValidated(std::move(*Bucket), Hash); return true; } @@ -305,7 +307,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; + eastl::fixed_vector<bool, 32> Results; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -481,16 +483,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb bool Exists = false; bool ReadFromUpstream = false; }; - struct RecordRequestData + struct RecordRequestData : public CacheKeyRequest { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + eastl::fixed_vector<ValueRequestData, 4> Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -503,8 +504,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; + eastl::fixed_vector<RecordRequestData, 16> Requests; + eastl::fixed_vector<size_t, 16> UpstreamIndexes; auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); @@ -535,7 +536,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CacheKey& Key = Request.Upstream.Key; + CacheKey& Key = Request.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { return CbPackage{}; @@ -707,7 +708,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); + UpstreamRequests.push_back(&Request); if (Request.Values.size()) { @@ -721,13 +722,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } - Request.Upstream.Policy = Builder.Build(); + Request.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + Request.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } @@ -737,10 +738,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return; } - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request); Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; Stopwatch Timer; auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); if (!Request.RecordObject) @@ -832,10 +832,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbPackage ResponsePackage; CbObjectWriter ResponseObject{2048}; + ResponsePackage.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { @@ -910,11 +912,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<bool> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<bool> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<bool, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -1099,15 +1102,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO uint64_t RawSize = 0; CompressedBuffer Result; }; - std::vector<RequestData> Requests; + eastl::fixed_vector<RequestData, 16> Requests; - std::vector<size_t> RemoteRequestIndexes; + eastl::fixed_vector<size_t, 16> RemoteRequestIndexes; const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheValue> CacheValues; - const uint64_t RequestCount = RequestsArray.Num(); + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + ZenCacheValueVec_t CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); CacheValues.reserve(RequestCount); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1136,7 +1139,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (Batch) @@ -1276,6 +1278,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; CbObjectWriter ResponseObject{1024}; + + RpcResponse.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1642,7 +1647,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<ZenCacheValue> Chunks; + ZenCacheValueVec_t Chunks; Chunks.reserve(ValueRequests.size()); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1796,6 +1801,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest CbPackage RpcResponse; CbObjectWriter Writer{1024}; + RpcResponse.ReserveAttachments(Requests.size()); + Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 133cb42d7..7d277329e 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -178,13 +178,13 @@ ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheNamespace::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {} - std::vector<ZenCacheValue>& Results; + GetBatchHandle(ZenCacheValueVec_t& OutResult) : Results(OutResult) {} + ZenCacheValueVec_t& Results; ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr; }; ZenCacheNamespace::GetBatchHandle* -ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult); Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult); @@ -580,7 +580,7 @@ ZenCacheStore::PutBatch::~PutBatch() } } -ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult) +ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCacheValueVec_t& OutResult) : m_CacheStore(CacheStore) , Results(OutResult) { |