diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-02 10:53:15 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-02 10:53:15 +0200 |
| commit | 1bafcb32cb48b2256a9d72995388b7df72058039 (patch) | |
| tree | 2385ec3de36c4f45018832eb36bcab6ac2d3670f /src/zenstore/cache/cachedisklayer.cpp | |
| parent | fix get project files loop (#68) (diff) | |
| download | zen-1bafcb32cb48b2256a9d72995388b7df72058039.tar.xz zen-1bafcb32cb48b2256a9d72995388b7df72058039.zip | |
batch cache put (#67)
- Improvement: Batch scope for put of cache values
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 213 |
1 files changed, 207 insertions, 6 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 51d547b3d..a497c8969 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1162,6 +1162,108 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return {}; } +struct ZenCacheDiskLayer::CacheBucket::BatchHandle +{ + BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + struct Entry + { + std::vector<IoHash> HashKeyAndReferences; + }; + std::vector<IoBuffer> Buffers; + std::vector<Entry> Entries; + std::vector<size_t> EntryResultIndexes; + + std::vector<bool>& OutResults; +}; + +ZenCacheDiskLayer::CacheBucket::BatchHandle* +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +{ + return new BatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept +{ + try + { + ZEN_ASSERT(Batch); + if (!Batch->Buffers.empty()) + { + std::vector<uint8_t> EntryFlags; + for (const IoBuffer& Buffer : Batch->Buffers) + { + uint8_t Flags = 0; + if (Buffer.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); + } + + size_t IndexOffset = 0; + m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + std::vector<DiskIndexEntry> DiskEntries; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + for (size_t Index = 0; Index < Locations.size(); Index++) + { + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 1); + const IoHash HashKey = HashKeyAndReferences[0]; + DiskEntries.push_back({.Key = HashKey, .Location = Location}); + if (m_TrackedCacheKeys) + { + m_TrackedCacheKeys->insert(HashKey); + } + if (m_TrackedReferences && HashKeyAndReferences.size() > 1) + { + m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + } + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); + BucketPayload& Payload = m_Payloads[EntryIndex]; + + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + + Payload = (BucketPayload{.Location = Location}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + } + else + { + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Location}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + } + } + m_SlogFile.Append(DiskEntries); + for (size_t Index = 0; Index < Locations.size(); Index++) + { + size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; + ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); + Batch->OutResults[ResultIndex] = true; + } + IndexOffset += Locations.size(); + }); + } + delete Batch; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Exception in cache bucket when ending batch put operation: '{}'", Ex.what()); + } +} + bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -1283,7 +1385,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); @@ -1292,10 +1397,14 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(true); + } } else { - PutInlineCacheValue(HashKey, Value, References); + PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); } m_DiskWriteCount++; @@ -2593,10 +2702,24 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); - + if (OptionalBatchHandle != nullptr) + { + OptionalBatchHandle->Buffers.push_back(Value.Value); + OptionalBatchHandle->Entries.push_back({}); + OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); + OptionalBatchHandle->OutResults.push_back(false); + std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; + HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.push_back(HashKey); + HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); + return; + } uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -3423,6 +3546,79 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) return Result; } +struct ZenCacheDiskLayer::BatchHandle +{ + BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + struct BucketHandle + { + CacheBucket* Bucket; + CacheBucket::BatchHandle* Handle; + }; + + void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept + { + RwLock::SharedLockScope _(Lock); + for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles) + { + ZEN_ASSERT(BucketHandle.Bucket); + ZEN_ASSERT(BucketHandle.Handle); + CB(BucketHandle.Bucket, BucketHandle.Handle); + } + } + + CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket) + { + { + RwLock::SharedLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + return It->Handle; + } + } + + CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); + if (NewBucketHandle == nullptr) + { + return nullptr; + } + + RwLock::ExclusiveLockScope _(Lock); + if (auto It = + std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); + It != BucketHandles.end()) + { + CacheBucket::BatchHandle* Result = It->Handle; + ZEN_ASSERT(Result != nullptr); + _.ReleaseNow(); + Bucket->EndPutBatch(NewBucketHandle); + return Result; + } + + BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); + + return NewBucketHandle; + } + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<bool>& OutResults; +}; + +ZenCacheDiskLayer::BatchHandle* +ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +{ + return new BatchHandle(OutResults); +} + +void +ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept +{ + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + delete Batch; +} + bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -3440,13 +3636,18 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::Put(std::string_view InBucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + BatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { - Bucket->Put(HashKey, Value, References); + CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); + Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } } |